From 384a30a735c64ac0c06de038f52b7bbfa7f39e15 Mon Sep 17 00:00:00 2001 From: Bikalpa Dhakal Date: Thu, 15 Aug 2024 07:58:05 +0000 Subject: [PATCH 1/8] WIP --- .../data_platform_libs/v0/data_interfaces.py | 507 +++++++++++++----- metadata.yaml | 3 + src/charm.py | 2 + src/config/kyuubi.py | 26 +- src/constants.py | 1 + src/core/context.py | 19 +- src/core/domain.py | 108 ++++ src/events/base.py | 25 +- src/events/zookeeper.py | 59 ++ src/managers/kyuubi.py | 5 +- 10 files changed, 614 insertions(+), 141 deletions(-) create mode 100644 src/events/zookeeper.py diff --git a/lib/charms/data_platform_libs/v0/data_interfaces.py b/lib/charms/data_platform_libs/v0/data_interfaces.py index 59a9722..aaed2e5 100644 --- a/lib/charms/data_platform_libs/v0/data_interfaces.py +++ b/lib/charms/data_platform_libs/v0/data_interfaces.py @@ -331,10 +331,14 @@ def _on_topic_requested(self, event: TopicRequestedEvent): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 37 +LIBPATCH = 39 PYDEPS = ["ops>=2.0.0"] +# Starting from what LIBPATCH number to apply legacy solutions +# v0.17 was the last version without secrets +LEGACY_SUPPORT_FROM = 17 + logger = logging.getLogger(__name__) Diff = namedtuple("Diff", "added changed deleted") @@ -351,36 +355,16 @@ def _on_topic_requested(self, event: TopicRequestedEvent): GROUP_MAPPING_FIELD = "secret_group_mapping" GROUP_SEPARATOR = "@" +MODEL_ERRORS = { + "not_leader": "this unit is not the leader", + "no_label_and_uri": "ERROR either URI or label should be used for getting an owned secret but not both", + "owner_no_refresh": "ERROR secret owner cannot use --refresh", +} -class SecretGroup(str): - """Secret groups specific type.""" - - -class SecretGroupsAggregate(str): - """Secret groups with option to extend with additional constants.""" - def __init__(self): - self.USER = SecretGroup("user") - self.TLS = SecretGroup("tls") - self.EXTRA = SecretGroup("extra") - - def __setattr__(self, name, value): - """Setting internal constants.""" - if name in self.__dict__: - raise RuntimeError("Can't set constant!") - else: - super().__setattr__(name, SecretGroup(value)) - - def groups(self) -> list: - """Return the list of stored SecretGroups.""" - return list(self.__dict__.values()) - - def get_group(self, group: str) -> Optional[SecretGroup]: - """If the input str translates to a group name, return that.""" - return SecretGroup(group) if group in self.groups() else None - - -SECRET_GROUPS = SecretGroupsAggregate() +############################################################################## +# Exceptions +############################################################################## class DataInterfacesError(Exception): @@ -407,6 +391,15 @@ class IllegalOperationError(DataInterfacesError): """To be used when an operation is not allowed to be performed.""" +############################################################################## +# Global helpers / utilities +############################################################################## + +############################################################################## +# Databag handling and comparison methods +############################################################################## + + def get_encoded_dict( relation: Relation, member: Union[Unit, Application], field: str ) -> Optional[Dict[str, str]]: @@ -482,6 +475,11 @@ def diff(event: RelationChangedEvent, bucket: Optional[Union[Unit, Application]] return Diff(added, changed, deleted) +############################################################################## +# Module decorators +############################################################################## + + def leader_only(f): """Decorator to ensure that only leader can perform given operation.""" @@ -536,6 +534,36 @@ def wrapper(self, *args, **kwargs): return wrapper +def legacy_apply_from_version(version: int) -> Callable: + """Decorator to decide whether to apply a legacy function or not. + + Based on LEGACY_SUPPORT_FROM module variable value, the importer charm may only want + to apply legacy solutions starting from a specific LIBPATCH. + + NOTE: All 'legacy' functions have to be defined and called in a way that they return `None`. + This results in cleaner and more secure execution flows in case the function may be disabled. + This requirement implicitly means that legacy functions change the internal state strictly, + don't return information. + """ + + def decorator(f: Callable[..., None]): + """Signature is ensuring None return value.""" + f.legacy_version = version + + def wrapper(self, *args, **kwargs) -> None: + if version >= LEGACY_SUPPORT_FROM: + return f(self, *args, **kwargs) + + return wrapper + + return decorator + + +############################################################################## +# Helper classes +############################################################################## + + class Scope(Enum): """Peer relations scope.""" @@ -543,9 +571,35 @@ class Scope(Enum): UNIT = "unit" -################################################################################ -# Secrets internal caching -################################################################################ +class SecretGroup(str): + """Secret groups specific type.""" + + +class SecretGroupsAggregate(str): + """Secret groups with option to extend with additional constants.""" + + def __init__(self): + self.USER = SecretGroup("user") + self.TLS = SecretGroup("tls") + self.EXTRA = SecretGroup("extra") + + def __setattr__(self, name, value): + """Setting internal constants.""" + if name in self.__dict__: + raise RuntimeError("Can't set constant!") + else: + super().__setattr__(name, SecretGroup(value)) + + def groups(self) -> list: + """Return the list of stored SecretGroups.""" + return list(self.__dict__.values()) + + def get_group(self, group: str) -> Optional[SecretGroup]: + """If the input str translates to a group name, return that.""" + return SecretGroup(group) if group in self.groups() else None + + +SECRET_GROUPS = SecretGroupsAggregate() class CachedSecret: @@ -554,6 +608,8 @@ class CachedSecret: The data structure is precisely re-using/simulating as in the actual Secret Storage """ + KNOWN_MODEL_ERRORS = [MODEL_ERRORS["no_label_and_uri"], MODEL_ERRORS["owner_no_refresh"]] + def __init__( self, model: Model, @@ -571,6 +627,95 @@ def __init__( self.legacy_labels = legacy_labels self.current_label = None + @property + def meta(self) -> Optional[Secret]: + """Getting cached secret meta-information.""" + if not self._secret_meta: + if not (self._secret_uri or self.label): + return + + try: + self._secret_meta = self._model.get_secret(label=self.label) + except SecretNotFoundError: + # Falling back to seeking for potential legacy labels + self._legacy_compat_find_secret_by_old_label() + + # If still not found, to be checked by URI, to be labelled with the proposed label + if not self._secret_meta and self._secret_uri: + self._secret_meta = self._model.get_secret(id=self._secret_uri, label=self.label) + return self._secret_meta + + ########################################################################## + # Backwards compatibility / Upgrades + ########################################################################## + # These functions are used to keep backwards compatibility on rolling upgrades + # Policy: + # All data is kept intact until the first write operation. (This allows a minimal + # grace period during which rollbacks are fully safe. For more info see the spec.) + # All data involves: + # - databag contents + # - secrets content + # - secret labels (!!!) + # Legacy functions must return None, and leave an equally consistent state whether + # they are executed or skipped (as a high enough versioned execution environment may + # not require so) + + # Compatibility + + @legacy_apply_from_version(34) + def _legacy_compat_find_secret_by_old_label(self) -> None: + """Compatibility function, allowing to find a secret by a legacy label. + + This functionality is typically needed when secret labels changed over an upgrade. + Until the first write operation, we need to maintain data as it was, including keeping + the old secret label. In order to keep track of the old label currently used to access + the secret, and additional 'current_label' field is being defined. + """ + for label in self.legacy_labels: + try: + self._secret_meta = self._model.get_secret(label=label) + except SecretNotFoundError: + pass + else: + if label != self.label: + self.current_label = label + return + + # Migrations + + @legacy_apply_from_version(34) + def _legacy_migration_to_new_label_if_needed(self) -> None: + """Helper function to re-create the secret with a different label. + + Juju does not provide a way to change secret labels. + Thus whenever moving from secrets version that involves secret label changes, + we "re-create" the existing secret, and attach the new label to the new + secret, to be used from then on. + + Note: we replace the old secret with a new one "in place", as we can't + easily switch the containing SecretCache structure to point to a new secret. + Instead we are changing the 'self' (CachedSecret) object to point to the + new instance. + """ + if not self.current_label or not (self.meta and self._secret_meta): + return + + # Create a new secret with the new label + content = self._secret_meta.get_content() + self._secret_uri = None + + # It will be nice to have the possibility to check if we are the owners of the secret... + try: + self._secret_meta = self.add_secret(content, label=self.label) + except ModelError as err: + if MODEL_ERRORS["not_leader"] not in str(err): + raise + self.current_label = None + + ########################################################################## + # Public functions + ########################################################################## + def add_secret( self, content: Dict[str, str], @@ -593,28 +738,6 @@ def add_secret( self._secret_meta = secret return self._secret_meta - @property - def meta(self) -> Optional[Secret]: - """Getting cached secret meta-information.""" - if not self._secret_meta: - if not (self._secret_uri or self.label): - return - - for label in [self.label] + self.legacy_labels: - try: - self._secret_meta = self._model.get_secret(label=label) - except SecretNotFoundError: - pass - else: - if label != self.label: - self.current_label = label - break - - # If still not found, to be checked by URI, to be labelled with the proposed label - if not self._secret_meta and self._secret_uri: - self._secret_meta = self._model.get_secret(id=self._secret_uri, label=self.label) - return self._secret_meta - def get_content(self) -> Dict[str, str]: """Getting cached secret content.""" if not self._secret_content: @@ -624,35 +747,14 @@ def get_content(self) -> Dict[str, str]: except (ValueError, ModelError) as err: # https://bugs.launchpad.net/juju/+bug/2042596 # Only triggered when 'refresh' is set - known_model_errors = [ - "ERROR either URI or label should be used for getting an owned secret but not both", - "ERROR secret owner cannot use --refresh", - ] if isinstance(err, ModelError) and not any( - msg in str(err) for msg in known_model_errors + msg in str(err) for msg in self.KNOWN_MODEL_ERRORS ): raise # Due to: ValueError: Secret owner cannot use refresh=True self._secret_content = self.meta.get_content() return self._secret_content - def _move_to_new_label_if_needed(self): - """Helper function to re-create the secret with a different label.""" - if not self.current_label or not (self.meta and self._secret_meta): - return - - # Create a new secret with the new label - content = self._secret_meta.get_content() - self._secret_uri = None - - # I wish we could just check if we are the owners of the secret... - try: - self._secret_meta = self.add_secret(content, label=self.label) - except ModelError as err: - if "this unit is not the leader" not in str(err): - raise - self.current_label = None - def set_content(self, content: Dict[str, str]) -> None: """Setting cached secret content.""" if not self.meta: @@ -663,7 +765,7 @@ def set_content(self, content: Dict[str, str]) -> None: return if content: - self._move_to_new_label_if_needed() + self._legacy_migration_to_new_label_if_needed() self.meta.set_content(content) self._secret_content = content else: @@ -926,6 +1028,23 @@ def _delete_relation_data(self, relation: Relation, fields: List[str]) -> None: """Delete data available (directily or indirectly -- i.e. secrets) from the relation for owner/this_app.""" raise NotImplementedError + # Optional overrides + + def _legacy_apply_on_fetch(self) -> None: + """This function should provide a list of compatibility functions to be applied when fetching (legacy) data.""" + pass + + def _legacy_apply_on_update(self, fields: List[str]) -> None: + """This function should provide a list of compatibility functions to be applied when writing data. + + Since data may be at a legacy version, migration may be mandatory. + """ + pass + + def _legacy_apply_on_delete(self, fields: List[str]) -> None: + """This function should provide a list of compatibility functions to be applied when deleting (legacy) data.""" + pass + # Internal helper methods @staticmethod @@ -1178,6 +1297,16 @@ def get_relation(self, relation_name, relation_id) -> Relation: return relation + def get_secret_uri(self, relation: Relation, group: SecretGroup) -> Optional[str]: + """Get the secret URI for the corresponding group.""" + secret_field = self._generate_secret_field_name(group) + return relation.data[self.component].get(secret_field) + + def set_secret_uri(self, relation: Relation, group: SecretGroup, secret_uri: str) -> None: + """Set the secret URI for the corresponding group.""" + secret_field = self._generate_secret_field_name(group) + relation.data[self.component][secret_field] = secret_uri + def fetch_relation_data( self, relation_ids: Optional[List[int]] = None, @@ -1194,6 +1323,8 @@ def fetch_relation_data( a dict of the values stored in the relation data bag for all relation instances (indexed by the relation ID). """ + self._legacy_apply_on_fetch() + if not relation_name: relation_name = self.relation_name @@ -1232,6 +1363,8 @@ def fetch_my_relation_data( NOTE: Since only the leader can read the relation's 'this_app'-side Application databag, the functionality is limited to leaders """ + self._legacy_apply_on_fetch() + if not relation_name: relation_name = self.relation_name @@ -1263,6 +1396,8 @@ def fetch_my_relation_field( @leader_only def update_relation_data(self, relation_id: int, data: dict) -> None: """Update the data within the relation.""" + self._legacy_apply_on_update(list(data.keys())) + relation_name = self.relation_name relation = self.get_relation(relation_name, relation_id) return self._update_relation_data(relation, data) @@ -1270,6 +1405,8 @@ def update_relation_data(self, relation_id: int, data: dict) -> None: @leader_only def delete_relation_data(self, relation_id: int, fields: List[str]) -> None: """Remove field from the relation.""" + self._legacy_apply_on_delete(fields) + relation_name = self.relation_name relation = self.get_relation(relation_name, relation_id) return self._delete_relation_data(relation, fields) @@ -1336,8 +1473,7 @@ def _add_relation_secret( uri_to_databag=True, ) -> bool: """Add a new Juju Secret that will be registered in the relation databag.""" - secret_field = self._generate_secret_field_name(group_mapping) - if uri_to_databag and relation.data[self.component].get(secret_field): + if uri_to_databag and self.get_secret_uri(relation, group_mapping): logging.error("Secret for relation %s already exists, not adding again", relation.id) return False @@ -1348,7 +1484,7 @@ def _add_relation_secret( # According to lint we may not have a Secret ID if uri_to_databag and secret.meta and secret.meta.id: - relation.data[self.component][secret_field] = secret.meta.id + self.set_secret_uri(relation, group_mapping, secret.meta.id) # Return the content that was added return True @@ -1449,8 +1585,7 @@ def _get_relation_secret( if not relation: return - secret_field = self._generate_secret_field_name(group_mapping) - if secret_uri := relation.data[self.local_app].get(secret_field): + if secret_uri := self.get_secret_uri(relation, group_mapping): return self.secrets.get(label, secret_uri) def _fetch_specific_relation_data( @@ -1603,11 +1738,10 @@ def _register_secrets_to_relation(self, relation: Relation, params_name_list: Li for group in SECRET_GROUPS.groups(): secret_field = self._generate_secret_field_name(group) - if secret_field in params_name_list: - if secret_uri := relation.data[relation.app].get(secret_field): - self._register_secret_to_relation( - relation.name, relation.id, secret_uri, group - ) + if secret_field in params_name_list and ( + secret_uri := self.get_secret_uri(relation, group) + ): + self._register_secret_to_relation(relation.name, relation.id, secret_uri, group) def _is_resource_created_for_relation(self, relation: Relation) -> bool: if not relation.app: @@ -1618,6 +1752,17 @@ def _is_resource_created_for_relation(self, relation: Relation) -> bool: ) return bool(data.get("username")) and bool(data.get("password")) + # Public functions + + def get_secret_uri(self, relation: Relation, group: SecretGroup) -> Optional[str]: + """Getting relation secret URI for the corresponding Secret Group.""" + secret_field = self._generate_secret_field_name(group) + return relation.data[relation.app].get(secret_field) + + def set_secret_uri(self, relation: Relation, group: SecretGroup, uri: str) -> None: + """Setting relation secret URI is not possible for a Requirer.""" + raise NotImplementedError("Requirer can not change the relation secret URI.") + def is_resource_created(self, relation_id: Optional[int] = None) -> bool: """Check if the resource has been created. @@ -1768,7 +1913,6 @@ def __init__( secret_field_name: Optional[str] = None, deleted_label: Optional[str] = None, ): - """Manager of base client relations.""" RequirerData.__init__( self, model, @@ -1779,6 +1923,11 @@ def __init__( self.secret_field_name = secret_field_name if secret_field_name else self.SECRET_FIELD_NAME self.deleted_label = deleted_label self._secret_label_map = {} + + # Legacy information holders + self._legacy_labels = [] + self._legacy_secret_uri = None + # Secrets that are being dynamically added within the scope of this event handler run self._new_secrets = [] self._additional_secret_group_mapping = additional_secret_group_mapping @@ -1853,10 +2002,12 @@ def set_secret( value: The string value of the secret group_mapping: The name of the "secret group", in case the field is to be added to an existing secret """ + self._legacy_apply_on_update([field]) + full_field = self._field_to_internal_name(field, group_mapping) if self.secrets_enabled and full_field not in self.current_secret_fields: self._new_secrets.append(full_field) - if self._no_group_with_databag(field, full_field): + if self.valid_field_pattern(field, full_field): self.update_relation_data(relation_id, {full_field: value}) # Unlike for set_secret(), there's no harm using this operation with static secrets @@ -1869,6 +2020,8 @@ def get_secret( group_mapping: Optional[SecretGroup] = None, ) -> Optional[str]: """Public interface method to fetch secrets only.""" + self._legacy_apply_on_fetch() + full_field = self._field_to_internal_name(field, group_mapping) if ( self.secrets_enabled @@ -1876,7 +2029,7 @@ def get_secret( and field not in self.current_secret_fields ): return - if self._no_group_with_databag(field, full_field): + if self.valid_field_pattern(field, full_field): return self.fetch_my_relation_field(relation_id, full_field) @dynamic_secrets_only @@ -1887,14 +2040,19 @@ def delete_secret( group_mapping: Optional[SecretGroup] = None, ) -> Optional[str]: """Public interface method to delete secrets only.""" + self._legacy_apply_on_delete([field]) + full_field = self._field_to_internal_name(field, group_mapping) if self.secrets_enabled and full_field not in self.current_secret_fields: logger.warning(f"Secret {field} from group {group_mapping} was not found") return - if self._no_group_with_databag(field, full_field): + + if self.valid_field_pattern(field, full_field): self.delete_relation_data(relation_id, [full_field]) + ########################################################################## # Helpers + ########################################################################## @staticmethod def _field_to_internal_name(field: str, group: Optional[SecretGroup]) -> str: @@ -1936,10 +2094,69 @@ def _content_for_secret_group( if k in self.secret_fields } - # Backwards compatibility + def valid_field_pattern(self, field: str, full_field: str) -> bool: + """Check that no secret group is attempted to be used together without secrets being enabled. + + Secrets groups are impossible to use with versions that are not yet supporting secrets. + """ + if not self.secrets_enabled and full_field != field: + logger.error( + f"Can't access {full_field}: no secrets available (i.e. no secret groups either)." + ) + return False + return True + + ########################################################################## + # Backwards compatibility / Upgrades + ########################################################################## + # These functions are used to keep backwards compatibility on upgrades + # Policy: + # All data is kept intact until the first write operation. (This allows a minimal + # grace period during which rollbacks are fully safe. For more info see spec.) + # All data involves: + # - databag + # - secrets content + # - secret labels (!!!) + # Legacy functions must return None, and leave an equally consistent state whether + # they are executed or skipped (as a high enough versioned execution environment may + # not require so) + + # Full legacy stack for each operation + + def _legacy_apply_on_fetch(self) -> None: + """All legacy functions to be applied on fetch.""" + relation = self._model.relations[self.relation_name][0] + self._legacy_compat_generate_prev_labels() + self._legacy_compat_secret_uri_from_databag(relation) + + def _legacy_apply_on_update(self, fields) -> None: + """All legacy functions to be applied on update.""" + relation = self._model.relations[self.relation_name][0] + self._legacy_compat_generate_prev_labels() + self._legacy_compat_secret_uri_from_databag(relation) + self._legacy_migration_remove_secret_from_databag(relation, fields) + self._legacy_migration_remove_secret_field_name_from_databag(relation) + + def _legacy_apply_on_delete(self, fields) -> None: + """All legacy functions to be applied on delete.""" + relation = self._model.relations[self.relation_name][0] + self._legacy_compat_generate_prev_labels() + self._legacy_compat_secret_uri_from_databag(relation) + self._legacy_compat_check_deleted_label(relation, fields) + + # Compatibility + + @legacy_apply_from_version(18) + def _legacy_compat_check_deleted_label(self, relation, fields) -> None: + """Helper function for legacy behavior. + + As long as https://bugs.launchpad.net/juju/+bug/2028094 wasn't fixed, + we did not delete fields but rather kept them in the secret with a string value + expressing invalidity. This function is maintainnig that behavior when needed. + """ + if not self.deleted_label: + return - def _check_deleted_label(self, relation, fields) -> None: - """Helper function for legacy behavior.""" current_data = self.fetch_my_relation_data([relation.id], fields) if current_data is not None: # Check if the secret we wanna delete actually exists @@ -1952,7 +2169,43 @@ def _check_deleted_label(self, relation, fields) -> None: ", ".join(non_existent), ) - def _remove_secret_from_databag(self, relation, fields: List[str]) -> None: + @legacy_apply_from_version(18) + def _legacy_compat_secret_uri_from_databag(self, relation) -> None: + """Fetching the secret URI from the databag, in case stored there.""" + self._legacy_secret_uri = relation.data[self.component].get( + self._generate_secret_field_name(), None + ) + + @legacy_apply_from_version(34) + def _legacy_compat_generate_prev_labels(self) -> None: + """Generator for legacy secret label names, for backwards compatibility. + + Secret label is part of the data that MUST be maintained across rolling upgrades. + In case there may be a change on a secret label, the old label must be recognized + after upgrades, and left intact until the first write operation -- when we roll over + to the new label. + + This function keeps "memory" of previously used secret labels. + NOTE: Return value takes decorator into account -- all 'legacy' functions may return `None` + + v0.34 (rev69): Fixing issue https://github.com/canonical/data-platform-libs/issues/155 + meant moving from '.' (i.e. 'mysql.app', 'mysql.unit') + to labels '..' (like 'peer.mysql.app') + """ + if self._legacy_labels: + return + + result = [] + members = [self._model.app.name] + if self.scope: + members.append(self.scope.value) + result.append(f"{'.'.join(members)}") + self._legacy_labels = result + + # Migration + + @legacy_apply_from_version(18) + def _legacy_migration_remove_secret_from_databag(self, relation, fields: List[str]) -> None: """For Rolling Upgrades -- when moving from databag to secrets usage. Practically what happens here is to remove stuff from the databag that is @@ -1966,10 +2219,16 @@ def _remove_secret_from_databag(self, relation, fields: List[str]) -> None: if self._fetch_relation_data_without_secrets(self.component, relation, [field]): self._delete_relation_data_without_secrets(self.component, relation, [field]) - def _remove_secret_field_name_from_databag(self, relation) -> None: + @legacy_apply_from_version(18) + def _legacy_migration_remove_secret_field_name_from_databag(self, relation) -> None: """Making sure that the old databag URI is gone. This action should not be executed more than once. + + There was a phase (before moving secrets usage to libs) when charms saved the peer + secret URI to the databag, and used this URI from then on to retrieve their secret. + When upgrading to charm versions using this library, we need to add a label to the + secret and access it via label from than on, and remove the old traces from the databag. """ # Nothing to do if 'internal-secret' is not in the databag if not (relation.data[self.component].get(self._generate_secret_field_name())): @@ -1985,25 +2244,9 @@ def _remove_secret_field_name_from_databag(self, relation) -> None: # Databag reference to the secret URI can be removed, now that it's labelled relation.data[self.component].pop(self._generate_secret_field_name(), None) - def _previous_labels(self) -> List[str]: - """Generator for legacy secret label names, for backwards compatibility.""" - result = [] - members = [self._model.app.name] - if self.scope: - members.append(self.scope.value) - result.append(f"{'.'.join(members)}") - return result - - def _no_group_with_databag(self, field: str, full_field: str) -> bool: - """Check that no secret group is attempted to be used together with databag.""" - if not self.secrets_enabled and full_field != field: - logger.error( - f"Can't access {full_field}: no secrets available (i.e. no secret groups either)." - ) - return False - return True - + ########################################################################## # Event handlers + ########################################################################## def _on_relation_changed_event(self, event: RelationChangedEvent) -> None: """Event emitted when the relation has changed.""" @@ -2013,7 +2256,9 @@ def _on_secret_changed_event(self, event: SecretChangedEvent) -> None: """Event emitted when the secret has changed.""" pass + ########################################################################## # Overrides of Relation Data handling functions + ########################################################################## def _generate_secret_label( self, relation_name: str, relation_id: int, group_mapping: SecretGroup @@ -2050,13 +2295,14 @@ def _get_relation_secret( return label = self._generate_secret_label(relation_name, relation_id, group_mapping) - secret_uri = relation.data[self.component].get(self._generate_secret_field_name(), None) # URI or legacy label is only to applied when moving single legacy secret to a (new) label if group_mapping == SECRET_GROUPS.EXTRA: # Fetching the secret with fallback to URI (in case label is not yet known) # Label would we "stuck" on the secret in case it is found - return self.secrets.get(label, secret_uri, legacy_labels=self._previous_labels()) + return self.secrets.get( + label, self._legacy_secret_uri, legacy_labels=self._legacy_labels + ) return self.secrets.get(label) def _get_group_secret_contents( @@ -2086,7 +2332,6 @@ def _fetch_my_specific_relation_data( @either_static_or_dynamic_secrets def _update_relation_data(self, relation: Relation, data: Dict[str, str]) -> None: """Update data available (directily or indirectly -- i.e. secrets) from the relation for owner/this_app.""" - self._remove_secret_from_databag(relation, list(data.keys())) _, normal_fields = self._process_secret_fields( relation, self.secret_fields, @@ -2095,7 +2340,6 @@ def _update_relation_data(self, relation: Relation, data: Dict[str, str]) -> Non data=data, uri_to_databag=False, ) - self._remove_secret_field_name_from_databag(relation) normal_content = {k: v for k, v in data.items() if k in normal_fields} self._update_relation_data_without_secrets(self.component, relation, normal_content) @@ -2104,8 +2348,6 @@ def _update_relation_data(self, relation: Relation, data: Dict[str, str]) -> Non def _delete_relation_data(self, relation: Relation, fields: List[str]) -> None: """Delete data available (directily or indirectly -- i.e. secrets) from the relation for owner/this_app.""" if self.secret_fields and self.deleted_label: - # Legacy, backwards compatibility - self._check_deleted_label(relation, fields) _, normal_fields = self._process_secret_fields( relation, @@ -2141,7 +2383,9 @@ def fetch_relation_field( "fetch_my_relation_data() and fetch_my_relation_field()" ) + ########################################################################## # Public functions -- inherited + ########################################################################## fetch_my_relation_data = Data.fetch_my_relation_data fetch_my_relation_field = Data.fetch_my_relation_field @@ -2606,6 +2850,14 @@ def set_version(self, relation_id: int, version: str) -> None: """ self.update_relation_data(relation_id, {"version": version}) + def set_subordinated(self, relation_id: int) -> None: + """Raises the subordinated flag in the application relation databag. + + Args: + relation_id: the identifier for a particular relation. + """ + self.update_relation_data(relation_id, {"subordinated": "true"}) + class DatabaseProviderEventHandlers(EventHandlers): """Provider-side of the database relation handlers.""" @@ -2842,6 +3094,21 @@ def _on_relation_created_event(self, event: RelationCreatedEvent) -> None: def _on_relation_changed_event(self, event: RelationChangedEvent) -> None: """Event emitted when the database relation has changed.""" + is_subordinate = False + remote_unit_data = None + for key in event.relation.data.keys(): + if isinstance(key, Unit) and not key.name.startswith(self.charm.app.name): + remote_unit_data = event.relation.data[key] + elif isinstance(key, Application) and key.name != self.charm.app.name: + is_subordinate = event.relation.data[key].get("subordinated") == "true" + + if is_subordinate: + if not remote_unit_data: + return + + if remote_unit_data.get("state") != "ready": + return + # Check which data has changed to emit customs events. diff = self._diff(event) diff --git a/metadata.yaml b/metadata.yaml index 123f250..f5c6992 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -44,6 +44,9 @@ requires: spark-service-account: interface: spark_service_account limit: 1 + zookeeper: + interface: zookeeper + limit: 1 provides: diff --git a/src/charm.py b/src/charm.py index 8f277b1..bdfd3d8 100755 --- a/src/charm.py +++ b/src/charm.py @@ -21,6 +21,7 @@ from events.integration_hub import SparkIntegrationHubEvents from events.kyuubi import KyuubiEvents from events.metastore import MetastoreEvents +from events.zookeeper import ZookeeperEvents from events.s3 import S3Events # Log messages can be retrieved using juju debug-log @@ -47,6 +48,7 @@ def __init__(self, *args): self.hub_events = SparkIntegrationHubEvents(self, self.context, self.workload) self.metastore_events = MetastoreEvents(self, self.context, self.workload) self.auth_events = AuthenticationEvents(self, self.context, self.workload) + self.zookeeper_events = ZookeeperEvents(self, self.context, self.workload) self.action_events = ActionEvents(self, self.context, self.workload) diff --git a/src/config/kyuubi.py b/src/config/kyuubi.py index 77f41b0..1aba54d 100644 --- a/src/config/kyuubi.py +++ b/src/config/kyuubi.py @@ -8,15 +8,16 @@ from typing import Optional from constants import AUTHENTICATION_TABLE_NAME -from core.domain import DatabaseConnectionInfo +from core.domain import DatabaseConnectionInfo, ZookeeperInfo from utils.logging import WithLogging class KyuubiConfig(WithLogging): """Kyuubi Configurations.""" - def __init__(self, db_info: Optional[DatabaseConnectionInfo]): + def __init__(self, db_info: Optional[DatabaseConnectionInfo], zookeeper_info: Optional[ZookeeperInfo]): self.db_info = db_info + self.zookeeper_info = zookeeper_info def _get_db_connection_url(self) -> str: endpoint = self.db_info.endpoint @@ -28,6 +29,14 @@ def _get_authentication_query(self) -> str: "WHERE username=${user} AND passwd=${password}" ) + def _get_zookeeper_auth_digest(self) -> str: + if not self.zookeeper_info: + return "" + username = self.zookeeper_info.username + password = self.zookeeper_info.password + return f"{username}:{password}" + + @property def _auth_conf(self) -> dict[str, str]: if not self.db_info: @@ -41,9 +50,20 @@ def _auth_conf(self) -> dict[str, str]: "kyuubi.authentication.jdbc.query": self._get_authentication_query(), } + @property + def _ha_conf(self) -> dict[str, str]: + if not self.zookeeper_info: + return {} + return { + "kyuubi.ha.addresses": self.zookeeper_info.uris, + "kyuubi.ha.namespace": self.zookeeper_info.namespace, + "kyuubi.ha.zookeeper.auth.type": "DIGEST", + "kyuubi.ha.zookeeper.auth.digest": self._get_zookeeper_auth_digest(), + } + def to_dict(self) -> dict[str, str]: """Return the dict representation of the configuration file.""" - return self._auth_conf + return self._auth_conf | self._ha_conf @property def contents(self) -> str: diff --git a/src/constants.py b/src/constants.py index e167c11..eaa711f 100644 --- a/src/constants.py +++ b/src/constants.py @@ -14,6 +14,7 @@ POSTGRESQL_METASTORE_DB_REL = "metastore-db" POSTGRESQL_AUTH_DB_REL = "auth-db" SPARK_SERVICE_ACCOUNT_REL = "spark-service-account" +ZOOKEEPER_REL = "zookeeper" NAMESPACE_CONFIG_NAME = "namespace" SERVICE_ACCOUNT_CONFIG_NAME = "service-account" diff --git a/src/core/context.py b/src/core/context.py index d3d0e59..06c6116 100644 --- a/src/core/context.py +++ b/src/core/context.py @@ -15,8 +15,9 @@ POSTGRESQL_METASTORE_DB_REL, S3_INTEGRATOR_REL, SPARK_SERVICE_ACCOUNT_REL, + ZOOKEEPER_REL ) -from core.domain import DatabaseConnectionInfo, S3ConnectionInfo, SparkServiceAccountInfo +from core.domain import DatabaseConnectionInfo, S3ConnectionInfo, SparkServiceAccountInfo, ZookeeperInfo from utils.logging import WithLogging @@ -31,10 +32,13 @@ def __init__(self, model: Model, config: ConfigData): ) self.auth_db_requirer = DatabaseRequirerData( self.model, - POSTGRESQL_AUTH_DB_REL, + POSTGRESQL_AUTH_DB_REL, database_name=AUTHENTICATION_DATABASE_NAME, extra_user_roles="superuser", ) + self.zookeeper_requirer_data = DatabaseRequirerData( + self.model, ZOOKEEPER_REL, database_name="/kyuubi" + ) @property def _s3_relation(self) -> Relation | None: @@ -46,6 +50,11 @@ def _spark_account_relation(self) -> Relation | None: """The integration hub relation.""" return self.model.get_relation(SPARK_SERVICE_ACCOUNT_REL) + @property + def _zookeeper_relation(self) -> Relation | None: + """The zookeeper relation""" + return self.model.get_relation(ZOOKEEPER_REL) + # --- DOMAIN OBJECTS --- @property @@ -91,6 +100,12 @@ def service_account(self) -> SparkServiceAccountInfo | None: ): return account + @property + def zookeeper(self) -> ZookeeperInfo | None: + """The state of the Zookeeper information.""" + return ZookeeperInfo(rel, self.zookeeper_requirer_data, rel.app) if (rel := self._zookeeper_relation) else None + + def is_authentication_enabled(self) -> bool: """Returns whether the authentication has been enabled in the Kyuubi charm.""" return bool(self.auth_db) diff --git a/src/core/domain.py b/src/core/domain.py index 4a15a58..23d1113 100644 --- a/src/core/domain.py +++ b/src/core/domain.py @@ -32,6 +32,7 @@ class Status(Enum): MISSING_OBJECT_STORAGE_BACKEND = BlockedStatus("Missing Object Storage backend") INVALID_CREDENTIALS = BlockedStatus("Invalid S3 credentials") MISSING_INTEGRATION_HUB = BlockedStatus("Missing integration hub relation") + MISSING_ZOOKEEPER = BlockedStatus("Missing zookeeper relation") INVALID_NAMESPACE = BlockedStatus("Invalid config option: namespace") INVALID_SERVICE_ACCOUNT = BlockedStatus("Invalid config option: service-account") @@ -152,3 +153,110 @@ def service_account(self): def namespace(self): """Namespace used for running Spark jobs.""" return self.relation_data["namespace"] + + +class ZookeeperInfo(RelationState): + """State collection metadata for a the Zookeeper relation.""" + + def __init__( + self, + relation: Relation | None, + data_interface: Data, + local_app: Application | None = None, + ): + super().__init__(relation, data_interface, None) + self._local_app = local_app + + @property + def username(self) -> str: + """Username to connect to ZooKeeper.""" + if not self.relation: + return "" + + return ( + self.data_interface.fetch_relation_field( + relation_id=self.relation.id, field="username" + ) + or "" + ) + + @property + def password(self) -> str: + """Password of the ZooKeeper user.""" + if not self.relation: + return "" + + return ( + self.data_interface.fetch_relation_field( + relation_id=self.relation.id, field="password" + ) + or "" + ) + + @property + def endpoints(self) -> str: + """IP/host where ZooKeeper is located.""" + if not self.relation: + return "" + + return ( + self.data_interface.fetch_relation_field( + relation_id=self.relation.id, field="endpoints" + ) + or "" + ) + + @property + def database(self) -> str: + """Path allocated for Kafka on ZooKeeper.""" + if not self.relation: + return "" + + return ( + self.data_interface.fetch_relation_field( + relation_id=self.relation.id, field="database" + ) + or self.chroot + ) + + @property + def uris(self) -> str: + """Comma separated connection string, containing endpoints""" + if not self.relation: + return "" + + return ",".join( + sorted( # sorting as they may be disordered + ( + self.data_interface.fetch_relation_field( + relation_id=self.relation.id, field="uris" + ) + or "" + ).split(",") + ) + ).replace(self.namespace, "") + + @property + def namespace(self) -> str: + """Path allocated for Kyuubi on ZooKeeper.""" + if not self.relation: + return "" + + return ( + self.data_interface.fetch_relation_field(relation_id=self.relation.id, field="chroot") + or "" + ) + + + @property + def zookeeper_connected(self) -> bool: + """Checks if there is an active ZooKeeper relation with all necessary data. + + Returns: + True if ZooKeeper is currently related with sufficient relation data + for a broker to connect with. Otherwise False + """ + if not all([self.username, self.password, self.endpoints, self.database, self.uris]): + return False + + return True diff --git a/src/events/base.py b/src/events/base.py index ab13f35..17b1961 100644 --- a/src/events/base.py +++ b/src/events/base.py @@ -10,7 +10,7 @@ from ops import CharmBase, EventBase, Object, StatusBase from core.context import Context -from core.domain import S3ConnectionInfo, SparkServiceAccountInfo, Status +from core.domain import Status from core.workload import KyuubiWorkloadBase from managers.k8s import K8sManager from managers.s3 import S3Manager @@ -26,28 +26,26 @@ class BaseEventHandler(Object, WithLogging): def get_app_status( self, - s3_info: S3ConnectionInfo | None, - service_account: SparkServiceAccountInfo | None, ) -> StatusBase: """Return the status of the charm.""" if not self.workload.ready(): return Status.WAITING_PEBBLE.value - if s3_info: - s3_manager = S3Manager(s3_info=s3_info) + if self.context.s3: + s3_manager = S3Manager(s3_info=self.context.s3) if not s3_manager.verify(): return Status.INVALID_CREDENTIALS.value - if not service_account: + if not self.context.service_account: return Status.MISSING_INTEGRATION_HUB.value - k8s_manager = K8sManager(service_account_info=service_account, workload=self.workload) + k8s_manager = K8sManager(service_account_info=self.context.service_account, workload=self.workload) # Check whether any one of object storage backend has been configured # Currently, we do this check on the basis of presence of Spark properties # TODO: Rethink on this approach with a more sturdy solution if ( - not s3_info + not self.context.s3 and not k8s_manager.is_s3_configured() and not k8s_manager.is_azure_storage_configured() ): @@ -59,6 +57,9 @@ def get_app_status( if not k8s_manager.is_service_account_valid(): return Status.INVALID_SERVICE_ACCOUNT.value + if not self.context.zookeeper: + return Status.MISSING_ZOOKEEPER.value + return Status.ACTIVE.value @@ -72,12 +73,8 @@ def wrapper_hook(event_handler: BaseEventHandler, event: EventBase): """Return output after resetting statuses.""" res = hook(event_handler, event) if event_handler.charm.unit.is_leader(): - event_handler.charm.app.status = event_handler.get_app_status( - event_handler.context.s3, event_handler.context.service_account - ) - event_handler.charm.unit.status = event_handler.get_app_status( - event_handler.context.s3, event_handler.context.service_account - ) + event_handler.charm.app.status = event_handler.get_app_status() + event_handler.charm.unit.status = event_handler.get_app_status() return res return wrapper_hook diff --git a/src/events/zookeeper.py b/src/events/zookeeper.py new file mode 100644 index 0000000..71c9127 --- /dev/null +++ b/src/events/zookeeper.py @@ -0,0 +1,59 @@ + +from charms.data_platform_libs.v0.data_interfaces import DatabaseRequirerEventHandlers +from constants import ZOOKEEPER_REL +from core.workload import KyuubiWorkloadBase +from events.base import BaseEventHandler, compute_status +from utils.logging import WithLogging +from managers.kyuubi import KyuubiManager +from ops import CharmBase +from core.context import Context + + +class ZookeeperEvents(BaseEventHandler, WithLogging): + """Class implementing Zookeeper integration event hooks.""" + + def __init__(self, charm: CharmBase, context: Context, workload: KyuubiWorkloadBase): + super().__init__(charm, "zookeeper") + + self.charm = charm + self.context = context + self.workload = workload + + self.kyuubi = KyuubiManager(self.workload) + self.zookeeper_handler = DatabaseRequirerEventHandlers(self.charm, self.context.zookeeper_requirer_data) + + # self.framework.observe(self.charm.on[ZOOKEEPER_REL].relation_created, self._on_zookeeper_created) + # self.framework.observe(self.charm.on[ZOOKEEPER_REL].relation_joined, self._on_zookeeper_joined) + self.framework.observe(self.charm.on[ZOOKEEPER_REL].relation_changed, self._on_zookeeper_changed) + self.framework.observe(self.charm.on[ZOOKEEPER_REL].relation_broken, self._on_zookeeper_broken) + + + # def _on_zookeeper_created(self, _): + # self.logger.warning("Zookeeper created...") + # self.logger.warning(self.context.zookeeper.endpoints) + + + # def _on_zookeeper_joined(self, _): + # self.logger.warning("Zookeeper joined...") + + @compute_status + def _on_zookeeper_changed(self, _): + self.logger.info("Zookeeper relation changed...") + self.kyuubi.update( + s3_info=self.context.s3, + metastore_db_info=self.context.metastore_db, + auth_db_info=self.context.auth_db, + service_account_info=self.context.service_account, + zookeeper_info=self.context.zookeeper + ) + + @compute_status + def _on_zookeeper_broken(self, _): + self.logger.info("Zookeeper relation broken...") + self.kyuubi.update( + s3_info=self.context.s3, + metastore_db_info=self.context.metastore_db, + auth_db_info=self.context.auth_db, + service_account_info=self.context.service_account, + zookeeper_info=None + ) diff --git a/src/managers/kyuubi.py b/src/managers/kyuubi.py index 1c00063..9b3beaa 100644 --- a/src/managers/kyuubi.py +++ b/src/managers/kyuubi.py @@ -7,7 +7,7 @@ from config.hive import HiveConfig from config.kyuubi import KyuubiConfig from config.spark import SparkConfig -from core.domain import DatabaseConnectionInfo, S3ConnectionInfo, SparkServiceAccountInfo +from core.domain import DatabaseConnectionInfo, S3ConnectionInfo, SparkServiceAccountInfo, ZookeeperInfo from core.workload import KyuubiWorkloadBase from utils.logging import WithLogging @@ -24,13 +24,14 @@ def update( metastore_db_info: DatabaseConnectionInfo | None, auth_db_info: DatabaseConnectionInfo | None, service_account_info: SparkServiceAccountInfo | None, + zookeeper_info: ZookeeperInfo | None, ): """Update Kyuubi service and restart it.""" spark_config = SparkConfig( s3_info=s3_info, service_account_info=service_account_info ).contents hive_config = HiveConfig(db_info=metastore_db_info).contents - kyuubi_config = KyuubiConfig(db_info=auth_db_info).contents + kyuubi_config = KyuubiConfig(db_info=auth_db_info, zookeeper_info=zookeeper_info).contents self.workload.write(spark_config, self.workload.SPARK_PROPERTIES_FILE) self.workload.write(hive_config, self.workload.HIVE_CONFIGURATION_FILE) From bc6e246cadb5221ea542115657e9abc78decca10 Mon Sep 17 00:00:00 2001 From: Bikalpa Dhakal Date: Tue, 20 Aug 2024 08:09:02 +0000 Subject: [PATCH 2/8] wip --- src/core/context.py | 2 +- src/events/auth.py | 3 +++ src/events/integration_hub.py | 2 ++ src/events/kyuubi.py | 2 ++ src/events/metastore.py | 2 ++ src/events/s3.py | 2 ++ src/events/zookeeper.py | 23 +++++++++++++++-------- 7 files changed, 27 insertions(+), 9 deletions(-) diff --git a/src/core/context.py b/src/core/context.py index 06c6116..9f8cd4f 100644 --- a/src/core/context.py +++ b/src/core/context.py @@ -37,7 +37,7 @@ def __init__(self, model: Model, config: ConfigData): extra_user_roles="superuser", ) self.zookeeper_requirer_data = DatabaseRequirerData( - self.model, ZOOKEEPER_REL, database_name="/kyuubi" + self.model, ZOOKEEPER_REL, database_name="kyuubist" ) @property diff --git a/src/events/auth.py b/src/events/auth.py index d40dbfc..127c904 100644 --- a/src/events/auth.py +++ b/src/events/auth.py @@ -56,6 +56,7 @@ def _on_auth_db_created(self, event: DatabaseCreatedEvent) -> None: metastore_db_info=self.context.metastore_db, auth_db_info=self.context.auth_db, service_account_info=self.context.service_account, + zookeeper_info=self.context.zookeeper, ) @compute_status @@ -67,6 +68,7 @@ def _on_auth_db_endpoints_changed(self, event) -> None: metastore_db_info=self.context.metastore_db, auth_db_info=self.context.auth_db, service_account_info=self.context.service_account, + zookeeper_info=self.context.zookeeper, ) @compute_status @@ -78,6 +80,7 @@ def _on_auth_db_relation_removed(self, event) -> None: metastore_db_info=self.context.metastore_db, auth_db_info=None, service_account_info=self.context.service_account, + zookeeper_info=self.context.zookeeper, ) @compute_status diff --git a/src/events/integration_hub.py b/src/events/integration_hub.py index 2c503de..e461263 100644 --- a/src/events/integration_hub.py +++ b/src/events/integration_hub.py @@ -59,6 +59,7 @@ def _on_account_granted(self, _: ServiceAccountGrantedEvent): metastore_db_info=self.context.metastore_db, auth_db_info=self.context.auth_db, service_account_info=self.context.service_account, + zookeeper_info=self.context.zookeeper, ) @compute_status @@ -71,4 +72,5 @@ def _on_account_gone(self, _: ServiceAccountGoneEvent): metastore_db_info=self.context.metastore_db, auth_db_info=self.context.auth_db, service_account_info=None, + zookeeper_info=self.context.zookeeper, ) diff --git a/src/events/kyuubi.py b/src/events/kyuubi.py index ca8a0af..d9f0c35 100644 --- a/src/events/kyuubi.py +++ b/src/events/kyuubi.py @@ -50,6 +50,7 @@ def _on_config_changed(self, event: ops.ConfigChangedEvent) -> None: metastore_db_info=self.context.metastore_db, auth_db_info=self.context.auth_db, service_account_info=self.context.service_account, + zookeeper_info=self.context.zookeeper, ) @compute_status @@ -67,4 +68,5 @@ def _on_kyuubi_pebble_ready(self, event: ops.PebbleReadyEvent): metastore_db_info=self.context.metastore_db, auth_db_info=self.context.auth_db, service_account_info=self.context.service_account, + zookeeper_info=self.context.zookeeper, ) diff --git a/src/events/metastore.py b/src/events/metastore.py index f322465..a11294f 100644 --- a/src/events/metastore.py +++ b/src/events/metastore.py @@ -52,6 +52,7 @@ def _on_metastore_db_created(self, event: DatabaseCreatedEvent) -> None: metastore_db_info=self.context.metastore_db, auth_db_info=self.context.auth_db, service_account_info=self.context.service_account, + zookeeper_info=self.context.zookeeper, ) @compute_status @@ -63,4 +64,5 @@ def _on_metastore_db_relation_removed(self, event) -> None: metastore_db_info=None, auth_db_info=self.context.auth_db, service_account_info=self.context.service_account, + zookeeper_info=self.context.zookeeper, ) diff --git a/src/events/s3.py b/src/events/s3.py index 1d5d1ee..ebee424 100644 --- a/src/events/s3.py +++ b/src/events/s3.py @@ -47,6 +47,7 @@ def _on_s3_credential_changed(self, _: CredentialsChangedEvent): metastore_db_info=self.context.metastore_db, auth_db_info=self.context.auth_db, service_account_info=self.context.service_account, + zookeeper_info=self.context.zookeeper, ) @compute_status @@ -59,4 +60,5 @@ def _on_s3_credential_gone(self, _: CredentialsGoneEvent): metastore_db_info=self.context.metastore_db, auth_db_info=self.context.auth_db, service_account_info=self.context.service_account, + zookeeper_info=self.context.zookeeper, ) diff --git a/src/events/zookeeper.py b/src/events/zookeeper.py index 71c9127..ea5a088 100644 --- a/src/events/zookeeper.py +++ b/src/events/zookeeper.py @@ -22,23 +22,30 @@ def __init__(self, charm: CharmBase, context: Context, workload: KyuubiWorkloadB self.kyuubi = KyuubiManager(self.workload) self.zookeeper_handler = DatabaseRequirerEventHandlers(self.charm, self.context.zookeeper_requirer_data) - # self.framework.observe(self.charm.on[ZOOKEEPER_REL].relation_created, self._on_zookeeper_created) - # self.framework.observe(self.charm.on[ZOOKEEPER_REL].relation_joined, self._on_zookeeper_joined) + self.framework.observe(self.charm.on[ZOOKEEPER_REL].relation_created, self._on_zookeeper_created) + self.framework.observe(self.charm.on[ZOOKEEPER_REL].relation_joined, self._on_zookeeper_joined) self.framework.observe(self.charm.on[ZOOKEEPER_REL].relation_changed, self._on_zookeeper_changed) self.framework.observe(self.charm.on[ZOOKEEPER_REL].relation_broken, self._on_zookeeper_broken) - # def _on_zookeeper_created(self, _): - # self.logger.warning("Zookeeper created...") - # self.logger.warning(self.context.zookeeper.endpoints) + def _on_zookeeper_created(self, _): + self.logger.warning("Zookeeper created...") + self.logger.warning(self.context._zookeeper_relation.data) - # def _on_zookeeper_joined(self, _): - # self.logger.warning("Zookeeper joined...") + def _on_zookeeper_joined(self, _): + self.logger.warning("Zookeeper joined...") + self.logger.warning(self.context._zookeeper_relation.data) + @compute_status def _on_zookeeper_changed(self, _): - self.logger.info("Zookeeper relation changed...") + self.logger.info("Zookeeper relation changed new...") + self.logger.info(self.context.zookeeper.uris) + self.logger.info(self.context.zookeeper.username) + self.logger.info(self.context.zookeeper.password) + self.logger.info(self.context.zookeeper.namespace) + self.logger.info(self.context._zookeeper_relation.data) self.kyuubi.update( s3_info=self.context.s3, metastore_db_info=self.context.metastore_db, From 996649df92393fa654ad435ff35dc527198a8b3f Mon Sep 17 00:00:00 2001 From: Bikalpa Dhakal Date: Tue, 20 Aug 2024 16:14:20 +0000 Subject: [PATCH 3/8] Issue with kyuubi and zookeeper --- src/constants.py | 12 +++++++++--- src/core/context.py | 9 +++++++-- src/core/workload/kyuubi.py | 6 ++++++ src/events/actions.py | 29 +++++++++++++++-------------- 4 files changed, 37 insertions(+), 19 deletions(-) diff --git a/src/constants.py b/src/constants.py index eaa711f..dda1e91 100644 --- a/src/constants.py +++ b/src/constants.py @@ -5,23 +5,29 @@ KYUUBI_CONTAINER_NAME = "kyuubi" KYUUBI_SERVICE_NAME = "kyuubi" + +# Database related literals METASTORE_DATABASE_NAME = "hivemetastore" AUTHENTICATION_DATABASE_NAME = "auth_db" AUTHENTICATION_TABLE_NAME = "kyuubi_users" POSTGRESQL_DEFAULT_DATABASE = "postgres" +# Relation names S3_INTEGRATOR_REL = "s3-credentials" POSTGRESQL_METASTORE_DB_REL = "metastore-db" POSTGRESQL_AUTH_DB_REL = "auth-db" SPARK_SERVICE_ACCOUNT_REL = "spark-service-account" ZOOKEEPER_REL = "zookeeper" +KYUUBI_CLIENT_RELATION_NAME = "jdbc" +# Literals related to K8s NAMESPACE_CONFIG_NAME = "namespace" SERVICE_ACCOUNT_CONFIG_NAME = "service-account" +# Literals related to Kyuubi JDBC_PORT = 10009 - KYUUBI_OCI_IMAGE = "ghcr.io/canonical/charmed-spark-kyuubi:3.4-22.04_edge" - DEFAULT_ADMIN_USERNAME = "admin" -KYUUBI_CLIENT_RELATION_NAME = "jdbc" + +# Zookeeper literals +HA_ZNODE_NAME = "/kyuubi" \ No newline at end of file diff --git a/src/core/context.py b/src/core/context.py index 9f8cd4f..3ef5fdb 100644 --- a/src/core/context.py +++ b/src/core/context.py @@ -15,7 +15,8 @@ POSTGRESQL_METASTORE_DB_REL, S3_INTEGRATOR_REL, SPARK_SERVICE_ACCOUNT_REL, - ZOOKEEPER_REL + ZOOKEEPER_REL, + HA_ZNODE_NAME ) from core.domain import DatabaseConnectionInfo, S3ConnectionInfo, SparkServiceAccountInfo, ZookeeperInfo from utils.logging import WithLogging @@ -37,7 +38,7 @@ def __init__(self, model: Model, config: ConfigData): extra_user_roles="superuser", ) self.zookeeper_requirer_data = DatabaseRequirerData( - self.model, ZOOKEEPER_REL, database_name="kyuubist" + self.model, ZOOKEEPER_REL, database_name=HA_ZNODE_NAME ) @property @@ -109,3 +110,7 @@ def zookeeper(self) -> ZookeeperInfo | None: def is_authentication_enabled(self) -> bool: """Returns whether the authentication has been enabled in the Kyuubi charm.""" return bool(self.auth_db) + + def is_ha_enabled(self) -> bool: + """Returns whether HA has been enabled in the Kyuubi charm.""" + return bool(self.zookeeper) \ No newline at end of file diff --git a/src/core/workload/kyuubi.py b/src/core/workload/kyuubi.py index 87d54bb..3cdfea8 100644 --- a/src/core/workload/kyuubi.py +++ b/src/core/workload/kyuubi.py @@ -28,6 +28,12 @@ def __init__(self, container: Container, user: User = User()): self.container = container self.user = user + def get_ip_address(self) -> str: + """Return the IP address of the unit running the workload""" + hostname = socket.getfqdn() + ip_address = socket.gethostbyname(hostname) + return ip_address + def get_jdbc_endpoint(self) -> str: """Return the JDBC endpoint to connect to Kyuubi server.""" hostname = socket.getfqdn() diff --git a/src/events/actions.py b/src/events/actions.py index bd174cf..ff87afa 100644 --- a/src/events/actions.py +++ b/src/events/actions.py @@ -7,7 +7,7 @@ from ops import CharmBase from ops.charm import ActionEvent -from constants import DEFAULT_ADMIN_USERNAME +from constants import DEFAULT_ADMIN_USERNAME, JDBC_PORT from core.context import Context from core.domain import Status from core.workload import KyuubiWorkloadBase @@ -40,14 +40,21 @@ def _on_get_jdbc_endpoint(self, event: ActionEvent): event.fail("The action failed because the workload is not ready yet.") return if ( - not self.get_app_status( - s3_info=self.context.s3, service_account=self.context.service_account - ) - != Status.ACTIVE + self.get_app_status() != Status.ACTIVE.value ): event.fail("The action failed because the charm is not in active state.") return - result = {"endpoint": self.workload.get_jdbc_endpoint()} + + if self.context.is_ha_enabled(): + address = self.context.zookeeper.uris + namespace = self.context.zookeeper.namespace + if not address.endswith("/"): + address += "/" + endpoint = f"jdbc:hive2://{address};serviceDiscoveryMode=zooKeeper;zooKeeperNamespace={namespace}" + else: + address = self.workload.get_ip_address() + endpoint = f"jdbc:hive2://{address}:{JDBC_PORT}/" + result = {"endpoint": endpoint} event.set_results(result) def _on_get_password(self, event: ActionEvent) -> None: @@ -62,10 +69,7 @@ def _on_get_password(self, event: ActionEvent) -> None: event.fail("The action failed because the workload is not ready yet.") return if ( - not self.get_app_status( - s3_info=self.context.s3, service_account=self.context.service_account - ) - != Status.ACTIVE + self.get_app_status() != Status.ACTIVE.value ): event.fail("The action failed because the charm is not in active state.") return @@ -90,10 +94,7 @@ def _on_set_password(self, event: ActionEvent) -> None: event.fail("The action failed because the workload is not ready yet.") return if ( - not self.get_app_status( - s3_info=self.context.s3, service_account=self.context.service_account - ) - != Status.ACTIVE + self.get_app_status() != Status.ACTIVE.value ): event.fail("The action failed because the charm is not in active state.") return From 1dc75cdc2e04bdda3800eedeae502856979348b5 Mon Sep 17 00:00:00 2001 From: Bikalpa Dhakal Date: Thu, 22 Aug 2024 08:19:48 +0000 Subject: [PATCH 4/8] Update trivial integration tests, unit tests --- lib/charms/zookeeper/v0/client.py | 610 ++++++++++++++++++++++++++++++ src/charm.py | 2 +- src/config/kyuubi.py | 7 +- src/constants.py | 2 +- src/core/context.py | 22 +- src/core/domain.py | 28 +- src/core/workload/kyuubi.py | 2 +- src/events/actions.py | 12 +- src/events/base.py | 7 +- src/events/zookeeper.py | 46 +-- src/managers/kyuubi.py | 7 +- tests/integration/conftest.py | 9 + tests/integration/test_charm.py | 86 +++++ tests/unit/conftest.py | 22 +- tests/unit/test_charm.py | 84 ++++ 15 files changed, 873 insertions(+), 73 deletions(-) create mode 100644 lib/charms/zookeeper/v0/client.py diff --git a/lib/charms/zookeeper/v0/client.py b/lib/charms/zookeeper/v0/client.py new file mode 100644 index 0000000..e6ce1c0 --- /dev/null +++ b/lib/charms/zookeeper/v0/client.py @@ -0,0 +1,610 @@ +#!/usr/bin/env python3 +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. + +"""ZooKeeperManager and ZooKeeperClient classes + +`ZooKeeperManager` provides an interface for performing actions that requires +a connection to the current ZK quorum leader, e.g updating zNodes, ACLs and quorum members. +On `__init__`, it loops through all passed hosts, attempts a `ZooKeeperClient` connection, and +checks leadership of each unit, storing the current quorum leader host as an attribute. + +In most cases, custom `Exception`s raised by `ZooKeeperManager` should trigger an `event.defer()`, +as they indicate that the servers are not ready to have actions performed upon them just yet. + +`ZooKeeperClient` serves as a handler for managing a ZooKeeper client connection to a +single unit. It's methods contain common 4lw commands, and functionality to read/write +to specific zNodes. +It is not expected to use this class from directly from charm code, +but to instead use the `ZooKeeperManager` class to perform it's actions on the ZK servers. + + +Instances of `ZooKeeperManager` are to be created by methods in either the `Charm` itself, +or from another library. + +Example usage for `ZooKeeperManager`: + +```python + +def update_cluster(new_members: List[str], event: EventBase) -> None: + + try: + zk = ZooKeeperManager( + hosts=["10.141.73.20", "10.141.73.21"], + client_port=2181, + username="super", + password="password" + ) + + current_quorum_members = zk.server_members + + servers_to_remove = list(current_quorum_members - new_members) + zk.remove_members(servers_to_remove) + + servers_to_add = sorted(new_members - current_quorum_members) + zk.add_members(servers_to_add) + + except ( + MembersSyncingError, + MemberNotReadyError, + QuorumLeaderNotFoundError, + ) as e: + logger.info(str(e)) + event.defer() + return +``` +""" + +import logging +import re +from typing import Any, Dict, Iterable, List, Optional, Set, Tuple + +from kazoo.client import ACL, KazooClient +from kazoo.handlers.threading import KazooTimeoutError +from tenacity import RetryError, retry +from tenacity.retry import retry_if_not_result +from tenacity.stop import stop_after_attempt +from tenacity.wait import wait_fixed + +# The unique Charmhub library identifier, never change it +LIBID = "4dc4430e6e5d492699391f57bd697fce" + +# Increment this major API version when introducing breaking changes +LIBAPI = 0 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version +LIBPATCH = 6 + + +logger = logging.getLogger(__name__) + +# Kazoo logs are unbearably chatty +logging.getLogger("kazoo.client").disabled = True + + +class MembersSyncingError(Exception): + """Generic exception for when quorum members are syncing data.""" + + pass + + +class MemberNotReadyError(Exception): + """Generic exception for when a zk unit can't be connected to or is not broadcasting.""" + + pass + + +class QuorumLeaderNotFoundError(Exception): + """Generic exception for when there are no zk leaders in the app.""" + + pass + + +class ZooKeeperManager: + """Handler for performing ZK commands.""" + + def __init__( + self, + hosts: List[str], + username: str, + password: str, + client_port: int = 2181, + use_ssl: bool = False, + keyfile_path: Optional[str] = "", + keyfile_password: Optional[str] = "", + certfile_path: Optional[str] = "", + ): + self.hosts = hosts + self.username = username + self.password = password + self.client_port = client_port + self.use_ssl = use_ssl + self.keyfile_path = keyfile_path + self.keyfile_password = keyfile_password + self.certfile_path = certfile_path + self.leader = "" + + try: + self.leader = self.get_leader() + except RetryError: + raise QuorumLeaderNotFoundError("quorum leader not found") + + @retry( + wait=wait_fixed(3), + stop=stop_after_attempt(2), + retry=retry_if_not_result(lambda result: True if result else False), + ) + def get_leader(self) -> str: + """Attempts to find the current ZK quorum leader. + + In the case when there is a leadership election, this may fail. + When this happens, we attempt 1 retry after 3 seconds. + + Returns: + String of the host for the quorum leader + + Raises: + tenacity.RetryError: if the leader can't be found during the retry conditions + """ + leader = None + for host in self.hosts: + try: + with ZooKeeperClient( + host=host, + client_port=self.client_port, + username=self.username, + password=self.password, + use_ssl=self.use_ssl, + keyfile_path=self.keyfile_path, + keyfile_password=self.keyfile_password, + certfile_path=self.certfile_path, + ) as zk: + response = zk.srvr + if response.get("Mode") == "leader": + leader = host + break + except KazooTimeoutError: # in the case of having a dead unit in relation data + logger.debug(f"TIMEOUT - {host}") + continue + + return leader or "" + + @property + def server_members(self) -> Set[str]: + """The current members within the ZooKeeper quorum. + + Returns: + A set of ZK member strings + e.g {"server.1=10.141.78.207:2888:3888:participant;0.0.0.0:2181"} + """ + with ZooKeeperClient( + host=self.leader, + client_port=self.client_port, + username=self.username, + password=self.password, + use_ssl=self.use_ssl, + keyfile_path=self.keyfile_path, + keyfile_password=self.keyfile_password, + certfile_path=self.certfile_path, + ) as zk: + members, _ = zk.config + + return set(members) + + @property + def config_version(self) -> int: + """The current config version for ZooKeeper. + + Returns: + The zookeeper config version decoded from base16 + """ + with ZooKeeperClient( + host=self.leader, + client_port=self.client_port, + username=self.username, + password=self.password, + use_ssl=self.use_ssl, + keyfile_path=self.keyfile_path, + keyfile_password=self.keyfile_password, + certfile_path=self.certfile_path, + ) as zk: + _, version = zk.config + + return version + + @property + def members_syncing(self) -> bool: + """Flag to check if any quorum members are currently syncing data. + + Returns: + True if any members are syncing. Otherwise False. + """ + with ZooKeeperClient( + host=self.leader, + client_port=self.client_port, + username=self.username, + password=self.password, + use_ssl=self.use_ssl, + keyfile_path=self.keyfile_path, + keyfile_password=self.keyfile_password, + certfile_path=self.certfile_path, + ) as zk: + result = zk.mntr + + try: + zk_pending_syncs = result["zk_pending_syncs"] + except KeyError: # missing key, no quorum, no syncing + return False + + if ( + result.get("zk_peer_state", "") == "leading - broadcast" + and float(zk_pending_syncs) == 0 + ): + return False + + return True + + @property + def members_broadcasting(self) -> bool: + """Flag to check if any quorum members are currently broadcasting. + + Returns: + True if any members are currently broadcasting. Otherwise False. + """ + for host in self.hosts: + try: + with ZooKeeperClient( + host=host, + client_port=self.client_port, + username=self.username, + password=self.password, + use_ssl=self.use_ssl, + keyfile_path=self.keyfile_path, + keyfile_password=self.keyfile_password, + certfile_path=self.certfile_path, + ) as zk: + if not zk.is_ready: + return False + except KazooTimeoutError: # in the case of having a dead unit in relation data + logger.debug(f"TIMEOUT - {host}") + return False + + return True + + def add_members(self, members: Iterable[str]) -> None: + """Adds new members to the members' dynamic config. + + Raises: + MembersSyncingError: if any members are busy syncing data + MemberNotReadyError: if any members are not yet broadcasting + """ + if self.members_syncing: + raise MembersSyncingError("Unable to add members - some members are syncing") + + for member in members: + host = member.split("=")[1].split(":")[0] + + try: + # individual connections to each server + with ZooKeeperClient( + host=host, + client_port=self.client_port, + username=self.username, + password=self.password, + use_ssl=self.use_ssl, + keyfile_path=self.keyfile_path, + keyfile_password=self.keyfile_password, + certfile_path=self.certfile_path, + ) as zk: + if not zk.is_ready: + raise MemberNotReadyError(f"Server is not ready: {host}") + except KazooTimeoutError as e: # for when units are departing + logger.debug(str(e)) + continue + + # specific connection to leader + with ZooKeeperClient( + host=self.leader, + client_port=self.client_port, + username=self.username, + password=self.password, + use_ssl=self.use_ssl, + keyfile_path=self.keyfile_path, + keyfile_password=self.keyfile_password, + certfile_path=self.certfile_path, + ) as zk: + zk.client.reconfig( + joining=member, leaving=None, new_members=None, from_config=self.config_version + ) + + def remove_members(self, members: Iterable[str]) -> None: + """Removes members from the members' dynamic config. + + Raises: + MembersSyncingError: if any members are busy syncing data + """ + if self.members_syncing: + raise MembersSyncingError("Unable to remove members - some members are syncing") + + for member in members: + member_id = re.findall(r"server.([0-9]+)", member)[0] + with ZooKeeperClient( + host=self.leader, + client_port=self.client_port, + username=self.username, + password=self.password, + use_ssl=self.use_ssl, + keyfile_path=self.keyfile_path, + keyfile_password=self.keyfile_password, + certfile_path=self.certfile_path, + ) as zk: + zk.client.reconfig( + joining=None, + leaving=member_id, + new_members=None, + from_config=self.config_version, + ) + + def leader_znodes(self, path: str) -> Set[str]: + """Grabs all children zNodes for a path on the current quorum leader. + + Args: + path: the 'root' path to search from + + Returns: + Set of all nested child zNodes + """ + with ZooKeeperClient( + host=self.leader, + client_port=self.client_port, + username=self.username, + password=self.password, + use_ssl=self.use_ssl, + keyfile_path=self.keyfile_path, + keyfile_password=self.keyfile_password, + certfile_path=self.certfile_path, + ) as zk: + all_znode_children = zk.get_all_znode_children(path=path) + + return all_znode_children + + def create_znode_leader(self, path: str, acls: List[ACL]) -> None: + """Creates a new zNode on the current quorum leader with given ACLs. + + Args: + path: the zNode path to set + acls: the ACLs to be set on that path + """ + with ZooKeeperClient( + host=self.leader, + client_port=self.client_port, + username=self.username, + password=self.password, + use_ssl=self.use_ssl, + keyfile_path=self.keyfile_path, + keyfile_password=self.keyfile_password, + certfile_path=self.certfile_path, + ) as zk: + zk.create_znode(path=path, acls=acls) + + def set_acls_znode_leader(self, path: str, acls: List[ACL]) -> None: + """Updates ACLs for an existing zNode on the current quorum leader. + + Args: + path: the zNode path to update + acls: the new ACLs to be set on that path + """ + with ZooKeeperClient( + host=self.leader, + client_port=self.client_port, + username=self.username, + password=self.password, + use_ssl=self.use_ssl, + keyfile_path=self.keyfile_path, + keyfile_password=self.keyfile_password, + certfile_path=self.certfile_path, + ) as zk: + zk.set_acls(path=path, acls=acls) + + def delete_znode_leader(self, path: str) -> None: + """Deletes a zNode path from the current quorum leader. + + Args: + path: the zNode path to delete + """ + with ZooKeeperClient( + host=self.leader, + client_port=self.client_port, + username=self.username, + password=self.password, + use_ssl=self.use_ssl, + keyfile_path=self.keyfile_path, + keyfile_password=self.keyfile_password, + certfile_path=self.certfile_path, + ) as zk: + zk.delete_znode(path=path) + + def get_version(self) -> str: + """Get ZooKeeper service version from srvr 4lw. + + Returns: + String of ZooKeeper service version + """ + with ZooKeeperClient( + host=self.leader, + client_port=self.client_port, + username=self.username, + password=self.password, + use_ssl=self.use_ssl, + keyfile_path=self.keyfile_path, + keyfile_password=self.keyfile_password, + certfile_path=self.certfile_path, + ) as zk: + return zk.srvr["Zookeeper version"].split("-", maxsplit=1)[0] + + +class ZooKeeperClient: + """Handler for ZooKeeper connections and running 4lw client commands.""" + + def __init__( + self, + host: str, + client_port: int, + username: str, + password: str, + use_ssl: bool = False, + keyfile_path: Optional[str] = None, + keyfile_password: Optional[str] = None, + certfile_path: Optional[str] = None, + ): + self.host = host + self.client_port = client_port + self.username = username + self.password = password + self.client = KazooClient( + hosts=f"{host}:{client_port}", + timeout=1.0, + sasl_options={"mechanism": "DIGEST-MD5", "username": username, "password": password}, + keyfile=keyfile_path, + keyfile_password=keyfile_password, + certfile=certfile_path, + verify_certs=False, + use_ssl=use_ssl, + ) + self.client.start() + + def __enter__(self): + return self + + def __exit__(self, object_type, value, traceback): + self.client.stop() + + def _run_4lw_command(self, command: str): + return self.client.command(command.encode()) + + @property + def config(self) -> Tuple[List[str], int]: + """Retrieves the dynamic config for a ZooKeeper service. + + Returns: + Tuple of the decoded config list, and decoded config version + """ + response = self.client.get("/zookeeper/config") + if response: + result = str(response[0].decode("utf-8")).splitlines() + version = int(result.pop(-1).split("=")[1], base=16) + else: + raise + + return result, version + + @property + def srvr(self) -> Dict[str, Any]: + """Retrieves attributes returned from the 'srvr' 4lw command. + + Returns: + Mapping of field and setting returned from `srvr` + """ + response = self._run_4lw_command("srvr") + + result = {} + for item in response.splitlines(): + k = re.split(": ", item)[0] + v = re.split(": ", item)[1] + result[k] = v + + return result + + @property + def mntr(self) -> Dict[str, Any]: + """Retrieves attributes returned from the 'mntr' 4lw command. + + Returns: + Mapping of field and setting returned from `mntr` + """ + response = self._run_4lw_command("mntr") + + result = {} + for item in response.splitlines(): + if re.search("=|\\t", item): + k = re.split("=|\\t", item)[0] + v = re.split("=|\\t", item)[1] + result[k] = v + else: + result[item] = "" + + return result + + @property + def is_ready(self) -> bool: + """Flag to confirm connected ZooKeeper server is connected and broadcasting. + + Returns: + True if server is broadcasting. Otherwise False. + """ + if self.client.connected: + return "broadcast" in self.mntr.get("zk_peer_state", "") + return False + + def get_all_znode_children(self, path: str) -> Set[str]: + """Recursively gets all children for a given parent znode path. + + Args: + path: the desired parent znode path to recurse + + Returns: + Set of all nested children znode paths for the given parent + """ + children = self.client.get_children(path) or [] + + result = set() + for child in children: + if path + child != "/zookeeper": + result.update(self.get_all_znode_children(path.rstrip("/") + "/" + child)) + if path != "/": + result.add(path) + + return result + + def delete_znode(self, path: str) -> None: + """Drop znode and all it's children from ZK tree. + + Args: + path: the desired znode path to delete + """ + if not self.client.exists(path): + return + self.client.delete(path, recursive=True) + + def create_znode(self, path: str, acls: List[ACL]) -> None: + """Create new znode. + + Args: + path: the desired znode path to create + acls: the acls for the new znode + """ + self.client.create(path, acl=acls, makepath=True) + + def get_acls(self, path: str) -> List[ACL]: + """Gets acls for a desired znode path. + + Args: + path: the desired znode path + + Returns: + List of the acls set for the given znode + """ + acl_list = self.client.get_acls(path) + + return acl_list if acl_list else [] + + def set_acls(self, path: str, acls: List[ACL]) -> None: + """Sets acls for a desired znode path. + + Args: + path: the desired znode path + acls: the acls to set to the given znode + """ + self.client.set_acls(path, acls) + diff --git a/src/charm.py b/src/charm.py index bdfd3d8..eef8cc7 100755 --- a/src/charm.py +++ b/src/charm.py @@ -21,8 +21,8 @@ from events.integration_hub import SparkIntegrationHubEvents from events.kyuubi import KyuubiEvents from events.metastore import MetastoreEvents -from events.zookeeper import ZookeeperEvents from events.s3 import S3Events +from events.zookeeper import ZookeeperEvents # Log messages can be retrieved using juju debug-log logger = logging.getLogger(__name__) diff --git a/src/config/kyuubi.py b/src/config/kyuubi.py index 1aba54d..bcddaba 100644 --- a/src/config/kyuubi.py +++ b/src/config/kyuubi.py @@ -15,7 +15,9 @@ class KyuubiConfig(WithLogging): """Kyuubi Configurations.""" - def __init__(self, db_info: Optional[DatabaseConnectionInfo], zookeeper_info: Optional[ZookeeperInfo]): + def __init__( + self, db_info: Optional[DatabaseConnectionInfo], zookeeper_info: Optional[ZookeeperInfo] + ): self.db_info = db_info self.zookeeper_info = zookeeper_info @@ -36,7 +38,6 @@ def _get_zookeeper_auth_digest(self) -> str: password = self.zookeeper_info.password return f"{username}:{password}" - @property def _auth_conf(self) -> dict[str, str]: if not self.db_info: @@ -56,7 +57,7 @@ def _ha_conf(self) -> dict[str, str]: return {} return { "kyuubi.ha.addresses": self.zookeeper_info.uris, - "kyuubi.ha.namespace": self.zookeeper_info.namespace, + "kyuubi.ha.namespace": self.zookeeper_info.database, "kyuubi.ha.zookeeper.auth.type": "DIGEST", "kyuubi.ha.zookeeper.auth.digest": self._get_zookeeper_auth_digest(), } diff --git a/src/constants.py b/src/constants.py index dda1e91..a7bfaa2 100644 --- a/src/constants.py +++ b/src/constants.py @@ -30,4 +30,4 @@ DEFAULT_ADMIN_USERNAME = "admin" # Zookeeper literals -HA_ZNODE_NAME = "/kyuubi" \ No newline at end of file +HA_ZNODE_NAME = "/kyuubi" diff --git a/src/core/context.py b/src/core/context.py index 3ef5fdb..6915f98 100644 --- a/src/core/context.py +++ b/src/core/context.py @@ -10,15 +10,20 @@ from common.relation.spark_sa import RequirerData from constants import ( AUTHENTICATION_DATABASE_NAME, + HA_ZNODE_NAME, METASTORE_DATABASE_NAME, POSTGRESQL_AUTH_DB_REL, POSTGRESQL_METASTORE_DB_REL, S3_INTEGRATOR_REL, SPARK_SERVICE_ACCOUNT_REL, ZOOKEEPER_REL, - HA_ZNODE_NAME ) -from core.domain import DatabaseConnectionInfo, S3ConnectionInfo, SparkServiceAccountInfo, ZookeeperInfo +from core.domain import ( + DatabaseConnectionInfo, + S3ConnectionInfo, + SparkServiceAccountInfo, + ZookeeperInfo, +) from utils.logging import WithLogging @@ -33,7 +38,7 @@ def __init__(self, model: Model, config: ConfigData): ) self.auth_db_requirer = DatabaseRequirerData( self.model, - POSTGRESQL_AUTH_DB_REL, + POSTGRESQL_AUTH_DB_REL, database_name=AUTHENTICATION_DATABASE_NAME, extra_user_roles="superuser", ) @@ -53,7 +58,7 @@ def _spark_account_relation(self) -> Relation | None: @property def _zookeeper_relation(self) -> Relation | None: - """The zookeeper relation""" + """The zookeeper relation.""" return self.model.get_relation(ZOOKEEPER_REL) # --- DOMAIN OBJECTS --- @@ -104,8 +109,11 @@ def service_account(self) -> SparkServiceAccountInfo | None: @property def zookeeper(self) -> ZookeeperInfo | None: """The state of the Zookeeper information.""" - return ZookeeperInfo(rel, self.zookeeper_requirer_data, rel.app) if (rel := self._zookeeper_relation) else None - + return ( + ZookeeperInfo(rel, self.zookeeper_requirer_data, rel.app) + if (rel := self._zookeeper_relation) + else None + ) def is_authentication_enabled(self) -> bool: """Returns whether the authentication has been enabled in the Kyuubi charm.""" @@ -113,4 +121,4 @@ def is_authentication_enabled(self) -> bool: def is_ha_enabled(self) -> bool: """Returns whether HA has been enabled in the Kyuubi charm.""" - return bool(self.zookeeper) \ No newline at end of file + return bool(self.zookeeper) diff --git a/src/core/domain.py b/src/core/domain.py index 23d1113..1324fe0 100644 --- a/src/core/domain.py +++ b/src/core/domain.py @@ -32,10 +32,8 @@ class Status(Enum): MISSING_OBJECT_STORAGE_BACKEND = BlockedStatus("Missing Object Storage backend") INVALID_CREDENTIALS = BlockedStatus("Invalid S3 credentials") MISSING_INTEGRATION_HUB = BlockedStatus("Missing integration hub relation") - MISSING_ZOOKEEPER = BlockedStatus("Missing zookeeper relation") INVALID_NAMESPACE = BlockedStatus("Invalid config option: namespace") INVALID_SERVICE_ACCOUNT = BlockedStatus("Invalid config option: service-account") - ACTIVE = ActiveStatus("") @@ -208,7 +206,7 @@ def endpoints(self) -> str: @property def database(self) -> str: - """Path allocated for Kafka on ZooKeeper.""" + """Path allocated for Kyuubi on ZooKeeper.""" if not self.relation: return "" @@ -216,12 +214,12 @@ def database(self) -> str: self.data_interface.fetch_relation_field( relation_id=self.relation.id, field="database" ) - or self.chroot + or "" ) @property def uris(self) -> str: - """Comma separated connection string, containing endpoints""" + """Comma separated connection string, containing endpoints.""" if not self.relation: return "" @@ -234,19 +232,7 @@ def uris(self) -> str: or "" ).split(",") ) - ).replace(self.namespace, "") - - @property - def namespace(self) -> str: - """Path allocated for Kyuubi on ZooKeeper.""" - if not self.relation: - return "" - - return ( - self.data_interface.fetch_relation_field(relation_id=self.relation.id, field="chroot") - or "" - ) - + ).replace(self.database, "") @property def zookeeper_connected(self) -> bool: @@ -256,7 +242,11 @@ def zookeeper_connected(self) -> bool: True if ZooKeeper is currently related with sufficient relation data for a broker to connect with. Otherwise False """ - if not all([self.username, self.password, self.endpoints, self.database, self.uris]): + if not all([self.username, self.password, self.database, self.uris]): return False return True + + def __bool__(self) -> bool: + """Return whether this class object has sufficient information.""" + return self.zookeeper_connected diff --git a/src/core/workload/kyuubi.py b/src/core/workload/kyuubi.py index 3cdfea8..a570d21 100644 --- a/src/core/workload/kyuubi.py +++ b/src/core/workload/kyuubi.py @@ -29,7 +29,7 @@ def __init__(self, container: Container, user: User = User()): self.user = user def get_ip_address(self) -> str: - """Return the IP address of the unit running the workload""" + """Return the IP address of the unit running the workload.""" hostname = socket.getfqdn() ip_address = socket.gethostbyname(hostname) return ip_address diff --git a/src/events/actions.py b/src/events/actions.py index ff87afa..b789252 100644 --- a/src/events/actions.py +++ b/src/events/actions.py @@ -39,9 +39,7 @@ def _on_get_jdbc_endpoint(self, event: ActionEvent): if not self.workload.ready(): event.fail("The action failed because the workload is not ready yet.") return - if ( - self.get_app_status() != Status.ACTIVE.value - ): + if self.get_app_status() != Status.ACTIVE.value: event.fail("The action failed because the charm is not in active state.") return @@ -68,9 +66,7 @@ def _on_get_password(self, event: ActionEvent) -> None: if not self.workload.ready(): event.fail("The action failed because the workload is not ready yet.") return - if ( - self.get_app_status() != Status.ACTIVE.value - ): + if self.get_app_status() != Status.ACTIVE.value: event.fail("The action failed because the charm is not in active state.") return password = self.auth.get_password(DEFAULT_ADMIN_USERNAME) @@ -93,9 +89,7 @@ def _on_set_password(self, event: ActionEvent) -> None: if not self.workload.ready(): event.fail("The action failed because the workload is not ready yet.") return - if ( - self.get_app_status() != Status.ACTIVE.value - ): + if self.get_app_status() != Status.ACTIVE.value: event.fail("The action failed because the charm is not in active state.") return diff --git a/src/events/base.py b/src/events/base.py index 17b1961..a87507d 100644 --- a/src/events/base.py +++ b/src/events/base.py @@ -39,7 +39,9 @@ def get_app_status( if not self.context.service_account: return Status.MISSING_INTEGRATION_HUB.value - k8s_manager = K8sManager(service_account_info=self.context.service_account, workload=self.workload) + k8s_manager = K8sManager( + service_account_info=self.context.service_account, workload=self.workload + ) # Check whether any one of object storage backend has been configured # Currently, we do this check on the basis of presence of Spark properties @@ -57,9 +59,6 @@ def get_app_status( if not k8s_manager.is_service_account_valid(): return Status.INVALID_SERVICE_ACCOUNT.value - if not self.context.zookeeper: - return Status.MISSING_ZOOKEEPER.value - return Status.ACTIVE.value diff --git a/src/events/zookeeper.py b/src/events/zookeeper.py index ea5a088..eeceae2 100644 --- a/src/events/zookeeper.py +++ b/src/events/zookeeper.py @@ -1,12 +1,18 @@ +#!/usr/bin/env python3 +# Copyright 2024 Canonical Limited +# See LICENSE file for licensing details. + +"""Zookeeper related event handlers.""" from charms.data_platform_libs.v0.data_interfaces import DatabaseRequirerEventHandlers +from ops import CharmBase + from constants import ZOOKEEPER_REL +from core.context import Context from core.workload import KyuubiWorkloadBase from events.base import BaseEventHandler, compute_status -from utils.logging import WithLogging from managers.kyuubi import KyuubiManager -from ops import CharmBase -from core.context import Context +from utils.logging import WithLogging class ZookeeperEvents(BaseEventHandler, WithLogging): @@ -20,38 +26,26 @@ def __init__(self, charm: CharmBase, context: Context, workload: KyuubiWorkloadB self.workload = workload self.kyuubi = KyuubiManager(self.workload) - self.zookeeper_handler = DatabaseRequirerEventHandlers(self.charm, self.context.zookeeper_requirer_data) - - self.framework.observe(self.charm.on[ZOOKEEPER_REL].relation_created, self._on_zookeeper_created) - self.framework.observe(self.charm.on[ZOOKEEPER_REL].relation_joined, self._on_zookeeper_joined) - self.framework.observe(self.charm.on[ZOOKEEPER_REL].relation_changed, self._on_zookeeper_changed) - self.framework.observe(self.charm.on[ZOOKEEPER_REL].relation_broken, self._on_zookeeper_broken) - - - def _on_zookeeper_created(self, _): - self.logger.warning("Zookeeper created...") - self.logger.warning(self.context._zookeeper_relation.data) - - - def _on_zookeeper_joined(self, _): - self.logger.warning("Zookeeper joined...") - self.logger.warning(self.context._zookeeper_relation.data) + self.zookeeper_handler = DatabaseRequirerEventHandlers( + self.charm, self.context.zookeeper_requirer_data + ) + self.framework.observe( + self.charm.on[ZOOKEEPER_REL].relation_changed, self._on_zookeeper_changed + ) + self.framework.observe( + self.charm.on[ZOOKEEPER_REL].relation_broken, self._on_zookeeper_broken + ) @compute_status def _on_zookeeper_changed(self, _): self.logger.info("Zookeeper relation changed new...") - self.logger.info(self.context.zookeeper.uris) - self.logger.info(self.context.zookeeper.username) - self.logger.info(self.context.zookeeper.password) - self.logger.info(self.context.zookeeper.namespace) - self.logger.info(self.context._zookeeper_relation.data) self.kyuubi.update( s3_info=self.context.s3, metastore_db_info=self.context.metastore_db, auth_db_info=self.context.auth_db, service_account_info=self.context.service_account, - zookeeper_info=self.context.zookeeper + zookeeper_info=self.context.zookeeper, ) @compute_status @@ -62,5 +56,5 @@ def _on_zookeeper_broken(self, _): metastore_db_info=self.context.metastore_db, auth_db_info=self.context.auth_db, service_account_info=self.context.service_account, - zookeeper_info=None + zookeeper_info=None, ) diff --git a/src/managers/kyuubi.py b/src/managers/kyuubi.py index 9b3beaa..1ce60fb 100644 --- a/src/managers/kyuubi.py +++ b/src/managers/kyuubi.py @@ -7,7 +7,12 @@ from config.hive import HiveConfig from config.kyuubi import KyuubiConfig from config.spark import SparkConfig -from core.domain import DatabaseConnectionInfo, S3ConnectionInfo, SparkServiceAccountInfo, ZookeeperInfo +from core.domain import ( + DatabaseConnectionInfo, + S3ConnectionInfo, + SparkServiceAccountInfo, + ZookeeperInfo, +) from core.workload import KyuubiWorkloadBase from utils.logging import WithLogging diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 2c77c3e..7a8807e 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -62,6 +62,7 @@ class IntegrationTestsCharms(BaseModel): s3: TestCharm postgres: TestCharm integration_hub: TestCharm + zookeeper: TestCharm @pytest.fixture(scope="module") @@ -88,6 +89,14 @@ def charm_versions() -> IntegrationTestsCharms: "trust": True, } ), + zookeeper=TestCharm( + **{ + "name": "zookeeper-k8s", + "channel": "3/edge", + "series": "jammy", + "alias": "zookeeper", + } + ) ) diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index 5fe720b..a0a1276 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -822,6 +822,92 @@ async def test_kyuubi_client_relation_removed(ops_test: OpsTest, test_pod, charm assert "Error validating the login" in process.stderr.decode() + +@pytest.mark.abort_on_fail +async def test_integration_with_zookeeper( + ops_test: OpsTest, charm_versions +): + """Test the charm by integrating it with Zookeeper.""" + # Deploy the charm and wait for waiting status + logger.info("Deploying zookeeper-k8s charm...") + await ops_test.model.deploy(**charm_versions.zookeeper.deploy_dict()), + + logger.info("Waiting for zookeeper app to be active and idle...") + await ops_test.model.wait_for_idle( + apps=[APP_NAME, charm_versions.zookeeper.application_name], timeout=1000, status="active" + ) + + logger.info("Integrating kyuubi charm with zookeeper charm...") + await ops_test.model.integrate(charm_versions.zookeeper.application_name, APP_NAME) + + logger.info("Waiting for zookeeper-k8s and kyuubi charms to be idle...") + await ops_test.model.wait_for_idle( + apps=[APP_NAME, charm_versions.s3.application_name], timeout=1000 + ) + + # Assert that both kyuubi-k8s and zookeeper-k8s charms are in active state + assert check_status( + ops_test.model.applications[APP_NAME], Status.ACTIVE.value + ) + assert ops_test.model.applications[charm_versions.zookeeper.application_name].status == "active" + + +@pytest.mark.abort_on_fail +async def test_remove_zookeeper_relation(ops_test: OpsTest, test_pod, charm_versions): + """Test the charm after the zookeeper relation has been broken.""" + + logger.info("Removing relation between zookeeper-k8s and kyuubi-k8s...") + await ops_test.model.applications[APP_NAME].remove_relation( + f"{APP_NAME}:zookeeper", f"{charm_versions.zookeeper.application_name}:zookeeper" + ) + + logger.info("Waiting for zookeeper-k8s and kyuubi-k8s apps to be idle and active...") + await ops_test.model.wait_for_idle( + apps=[APP_NAME, charm_versions.zookeeper.application_name], timeout=1000, status="active" + ) + + # Assert that both kyuubi-k8s and zookeeper-k8s charms are in active state + assert check_status( + ops_test.model.applications[APP_NAME], Status.ACTIVE.value + ) + assert ops_test.model.applications[charm_versions.zookeeper.application_name].status == "active" + + logger.info( + "Waiting for extra 30 seconds as cool-down period before proceeding with the test..." + ) + time.sleep(30) + + logger.info("Running action 'get-jdbc-endpoint' on kyuubi-k8s unit...") + kyuubi_unit = ops_test.model.applications[APP_NAME].units[0] + action = await kyuubi_unit.run_action( + action_name="get-jdbc-endpoint", + ) + result = await action.wait() + + jdbc_endpoint = result.results.get("endpoint") + logger.info(f"JDBC endpoint: {jdbc_endpoint}") + + logger.info("Testing JDBC endpoint by connecting with beeline with no credentials ...") + process = subprocess.run( + [ + "./tests/integration/test_jdbc_endpoint.sh", + test_pod, + jdbc_endpoint, + "db_555", + "table_555", + ], + capture_output=True, + ) + print("========== test_jdbc_endpoint.sh STDOUT =================") + print(process.stdout.decode()) + print("========== test_jdbc_endpoint.sh STDERR =================") + print(process.stderr.decode()) + logger.info(f"JDBC endpoint test returned with status {process.returncode}") + assert process.returncode == 0 + + + + @pytest.mark.abort_on_fail async def test_remove_authentication(ops_test: OpsTest, test_pod, charm_versions): """Test the JDBC connection when authentication is disabled.""" diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 2c54656..88b1312 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -7,7 +7,7 @@ from scenario.state import next_relation_id from charm import KyuubiCharm -from constants import KYUUBI_CONTAINER_NAME, S3_INTEGRATOR_REL, SPARK_SERVICE_ACCOUNT_REL +from constants import KYUUBI_CONTAINER_NAME, S3_INTEGRATOR_REL, SPARK_SERVICE_ACCOUNT_REL, ZOOKEEPER_REL @pytest.fixture @@ -93,3 +93,23 @@ def spark_service_account_relation(): local_app_data={"service-account": "kyuubi", "namespace": "spark"}, remote_app_data={"service-account": "kyuubi", "namespace": "spark"}, ) + + +@pytest.fixture +def zookeeper_relation(): + """Provide fixture for the Zookeeper relation.""" + relation_id = next_relation_id(update=True) + + return Relation( + endpoint=ZOOKEEPER_REL, + interface="zookeeper", + remote_app_name="zookeeper-k8s", + relation_id=relation_id, + local_app_data={"database": f"/kyuubi"}, + remote_app_data={ + "uris": "host1:2181,host2:2181,host3:2181", + "username": "foobar", + "password": "foopassbarword", + "database": "/kyuubi", + }, + ) \ No newline at end of file diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 46f86b0..64a1af8 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -24,6 +24,14 @@ def parse_spark_properties(tmp_path: Path) -> dict[str, str]: row.rsplit("=", maxsplit=1) for line in fid.readlines() if (row := line.strip()) ) +def parse_kyuubi_configurations(tmp_path: Path) -> dict[str, str]: + """Parse and return Kyuubi configurations from the conf file in the container.""" + file_path = tmp_path / Path(KyuubiWorkload.KYUUBI_CONFIGURATION_FILE).relative_to("/opt") + with file_path.open("r") as fid: + return dict( + row.rsplit("=", maxsplit=1) for line in fid.readlines() if (row := line.strip()) + ) + def test_start_kyuubi(kyuubi_context): state = State( @@ -204,6 +212,82 @@ def test_object_storage_backend_removed( assert state_after_relation_broken.unit_status == Status.MISSING_OBJECT_STORAGE_BACKEND.value +@patch("managers.s3.S3Manager.verify", return_value=True) +@patch("managers.k8s.K8sManager.is_namespace_valid", return_value=True) +@patch("managers.k8s.K8sManager.is_service_account_valid", return_value=True) +@patch("managers.k8s.K8sManager.is_s3_configured", return_value=True) +@patch("config.spark.SparkConfig._get_spark_master", return_value="k8s://https://spark.master") +@patch("config.spark.SparkConfig._sa_conf", return_value={}) +def test_zookeeper_relation_joined( + mock_sa_conf, + mock_get_master, + mock_s3_configured, + mock_valid_sa, + mock_valid_ns, + mock_s3_verify, + tmp_path, + kyuubi_context, + kyuubi_container, + s3_relation, + spark_service_account_relation, + zookeeper_relation +): + state = State( + relations=[s3_relation, spark_service_account_relation, zookeeper_relation], + containers=[kyuubi_container], + ) + out = kyuubi_context.run(zookeeper_relation.changed_event, state) + assert out.unit_status == Status.ACTIVE.value + + kyuubi_configurations = parse_kyuubi_configurations(tmp_path) + logger.info(kyuubi_configurations) + print(kyuubi_configurations) + print(tmp_path) + + # Assert some of the keys + assert kyuubi_configurations["kyuubi.ha.namespace"] == zookeeper_relation.remote_app_data["database"] + assert kyuubi_configurations["kyuubi.ha.addresses"] == zookeeper_relation.remote_app_data["uris"] + assert kyuubi_configurations["kyuubi.ha.zookeeper.auth.type"] == "DIGEST" + assert kyuubi_configurations["kyuubi.ha.zookeeper.auth.digest"] == f"{zookeeper_relation.remote_app_data['username']}:{zookeeper_relation.remote_app_data['password']}" + + +@patch("managers.s3.S3Manager.verify", return_value=True) +@patch("managers.k8s.K8sManager.is_namespace_valid", return_value=True) +@patch("managers.k8s.K8sManager.is_service_account_valid", return_value=True) +@patch("managers.k8s.K8sManager.is_s3_configured", return_value=True) +@patch("config.spark.SparkConfig._get_spark_master", return_value="k8s://https://spark.master") +@patch("config.spark.SparkConfig._sa_conf", return_value={}) +def test_zookeeper_relation_broken( + mock_sa_conf, + mock_get_master, + mock_s3_configured, + mock_valid_sa, + mock_valid_ns, + mock_s3_verify, + tmp_path, + kyuubi_context, + kyuubi_container, + s3_relation, + spark_service_account_relation, + zookeeper_relation +): + state = State( + relations=[s3_relation, spark_service_account_relation, zookeeper_relation], + containers=[kyuubi_container], + ) + state_after_relation_changed = kyuubi_context.run(zookeeper_relation.changed_event, state) + state_after_relation_broken = kyuubi_context.run(zookeeper_relation.broken_event, state_after_relation_changed) + assert state_after_relation_broken.unit_status == Status.ACTIVE.value + + kyuubi_configurations = parse_kyuubi_configurations(tmp_path) + + # Assert HA configurations do not exist in Kyuubi configuration file + assert "kyuubi.ha.namespace" not in kyuubi_configurations + assert "kyuubi.ha.addresses" not in kyuubi_configurations + assert "kyuubi.ha.zookeeper.auth.type" not in kyuubi_configurations + assert "kyuubi.ha.zookeeper.auth.digest" not in kyuubi_configurations + + @patch("managers.s3.S3Manager.verify", return_value=True) @patch("managers.k8s.K8sManager.is_namespace_valid", return_value=True) @patch("managers.k8s.K8sManager.is_service_account_valid", return_value=True) From 0f5eab1deec34623175589ce01a6962a5b31721b Mon Sep 17 00:00:00 2001 From: Bikalpa Dhakal Date: Thu, 22 Aug 2024 08:22:03 +0000 Subject: [PATCH 5/8] Fix linters and formatters --- tests/integration/conftest.py | 2 +- tests/integration/test_charm.py | 20 +++++++------------- tests/unit/conftest.py | 11 ++++++++--- tests/unit/test_charm.py | 26 +++++++++++++++++--------- 4 files changed, 33 insertions(+), 26 deletions(-) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 7a8807e..c3c93d4 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -96,7 +96,7 @@ def charm_versions() -> IntegrationTestsCharms: "series": "jammy", "alias": "zookeeper", } - ) + ), ) diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index a0a1276..2e11481 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -822,11 +822,8 @@ async def test_kyuubi_client_relation_removed(ops_test: OpsTest, test_pod, charm assert "Error validating the login" in process.stderr.decode() - @pytest.mark.abort_on_fail -async def test_integration_with_zookeeper( - ops_test: OpsTest, charm_versions -): +async def test_integration_with_zookeeper(ops_test: OpsTest, charm_versions): """Test the charm by integrating it with Zookeeper.""" # Deploy the charm and wait for waiting status logger.info("Deploying zookeeper-k8s charm...") @@ -846,16 +843,15 @@ async def test_integration_with_zookeeper( ) # Assert that both kyuubi-k8s and zookeeper-k8s charms are in active state - assert check_status( - ops_test.model.applications[APP_NAME], Status.ACTIVE.value + assert check_status(ops_test.model.applications[APP_NAME], Status.ACTIVE.value) + assert ( + ops_test.model.applications[charm_versions.zookeeper.application_name].status == "active" ) - assert ops_test.model.applications[charm_versions.zookeeper.application_name].status == "active" @pytest.mark.abort_on_fail async def test_remove_zookeeper_relation(ops_test: OpsTest, test_pod, charm_versions): """Test the charm after the zookeeper relation has been broken.""" - logger.info("Removing relation between zookeeper-k8s and kyuubi-k8s...") await ops_test.model.applications[APP_NAME].remove_relation( f"{APP_NAME}:zookeeper", f"{charm_versions.zookeeper.application_name}:zookeeper" @@ -867,10 +863,10 @@ async def test_remove_zookeeper_relation(ops_test: OpsTest, test_pod, charm_vers ) # Assert that both kyuubi-k8s and zookeeper-k8s charms are in active state - assert check_status( - ops_test.model.applications[APP_NAME], Status.ACTIVE.value + assert check_status(ops_test.model.applications[APP_NAME], Status.ACTIVE.value) + assert ( + ops_test.model.applications[charm_versions.zookeeper.application_name].status == "active" ) - assert ops_test.model.applications[charm_versions.zookeeper.application_name].status == "active" logger.info( "Waiting for extra 30 seconds as cool-down period before proceeding with the test..." @@ -906,8 +902,6 @@ async def test_remove_zookeeper_relation(ops_test: OpsTest, test_pod, charm_vers assert process.returncode == 0 - - @pytest.mark.abort_on_fail async def test_remove_authentication(ops_test: OpsTest, test_pod, charm_versions): """Test the JDBC connection when authentication is disabled.""" diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 88b1312..9cc8537 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -7,7 +7,12 @@ from scenario.state import next_relation_id from charm import KyuubiCharm -from constants import KYUUBI_CONTAINER_NAME, S3_INTEGRATOR_REL, SPARK_SERVICE_ACCOUNT_REL, ZOOKEEPER_REL +from constants import ( + KYUUBI_CONTAINER_NAME, + S3_INTEGRATOR_REL, + SPARK_SERVICE_ACCOUNT_REL, + ZOOKEEPER_REL, +) @pytest.fixture @@ -105,11 +110,11 @@ def zookeeper_relation(): interface="zookeeper", remote_app_name="zookeeper-k8s", relation_id=relation_id, - local_app_data={"database": f"/kyuubi"}, + local_app_data={"database": "/kyuubi"}, remote_app_data={ "uris": "host1:2181,host2:2181,host3:2181", "username": "foobar", "password": "foopassbarword", "database": "/kyuubi", }, - ) \ No newline at end of file + ) diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 64a1af8..3f8984b 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -24,6 +24,7 @@ def parse_spark_properties(tmp_path: Path) -> dict[str, str]: row.rsplit("=", maxsplit=1) for line in fid.readlines() if (row := line.strip()) ) + def parse_kyuubi_configurations(tmp_path: Path) -> dict[str, str]: """Parse and return Kyuubi configurations from the conf file in the container.""" file_path = tmp_path / Path(KyuubiWorkload.KYUUBI_CONFIGURATION_FILE).relative_to("/opt") @@ -230,7 +231,7 @@ def test_zookeeper_relation_joined( kyuubi_container, s3_relation, spark_service_account_relation, - zookeeper_relation + zookeeper_relation, ): state = State( relations=[s3_relation, spark_service_account_relation, zookeeper_relation], @@ -240,15 +241,20 @@ def test_zookeeper_relation_joined( assert out.unit_status == Status.ACTIVE.value kyuubi_configurations = parse_kyuubi_configurations(tmp_path) - logger.info(kyuubi_configurations) - print(kyuubi_configurations) - print(tmp_path) # Assert some of the keys - assert kyuubi_configurations["kyuubi.ha.namespace"] == zookeeper_relation.remote_app_data["database"] - assert kyuubi_configurations["kyuubi.ha.addresses"] == zookeeper_relation.remote_app_data["uris"] + assert ( + kyuubi_configurations["kyuubi.ha.namespace"] + == zookeeper_relation.remote_app_data["database"] + ) + assert ( + kyuubi_configurations["kyuubi.ha.addresses"] == zookeeper_relation.remote_app_data["uris"] + ) assert kyuubi_configurations["kyuubi.ha.zookeeper.auth.type"] == "DIGEST" - assert kyuubi_configurations["kyuubi.ha.zookeeper.auth.digest"] == f"{zookeeper_relation.remote_app_data['username']}:{zookeeper_relation.remote_app_data['password']}" + assert ( + kyuubi_configurations["kyuubi.ha.zookeeper.auth.digest"] + == f"{zookeeper_relation.remote_app_data['username']}:{zookeeper_relation.remote_app_data['password']}" + ) @patch("managers.s3.S3Manager.verify", return_value=True) @@ -269,14 +275,16 @@ def test_zookeeper_relation_broken( kyuubi_container, s3_relation, spark_service_account_relation, - zookeeper_relation + zookeeper_relation, ): state = State( relations=[s3_relation, spark_service_account_relation, zookeeper_relation], containers=[kyuubi_container], ) state_after_relation_changed = kyuubi_context.run(zookeeper_relation.changed_event, state) - state_after_relation_broken = kyuubi_context.run(zookeeper_relation.broken_event, state_after_relation_changed) + state_after_relation_broken = kyuubi_context.run( + zookeeper_relation.broken_event, state_after_relation_changed + ) assert state_after_relation_broken.unit_status == Status.ACTIVE.value kyuubi_configurations = parse_kyuubi_configurations(tmp_path) From b68826fa01cf1a25f25021600c95327992285fe2 Mon Sep 17 00:00:00 2001 From: Bikalpa Dhakal Date: Thu, 22 Aug 2024 08:26:06 +0000 Subject: [PATCH 6/8] Shorten remove-zookeeper test --- tests/integration/test_charm.py | 33 --------------------------------- 1 file changed, 33 deletions(-) diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index 2e11481..6bce5a3 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -868,39 +868,6 @@ async def test_remove_zookeeper_relation(ops_test: OpsTest, test_pod, charm_vers ops_test.model.applications[charm_versions.zookeeper.application_name].status == "active" ) - logger.info( - "Waiting for extra 30 seconds as cool-down period before proceeding with the test..." - ) - time.sleep(30) - - logger.info("Running action 'get-jdbc-endpoint' on kyuubi-k8s unit...") - kyuubi_unit = ops_test.model.applications[APP_NAME].units[0] - action = await kyuubi_unit.run_action( - action_name="get-jdbc-endpoint", - ) - result = await action.wait() - - jdbc_endpoint = result.results.get("endpoint") - logger.info(f"JDBC endpoint: {jdbc_endpoint}") - - logger.info("Testing JDBC endpoint by connecting with beeline with no credentials ...") - process = subprocess.run( - [ - "./tests/integration/test_jdbc_endpoint.sh", - test_pod, - jdbc_endpoint, - "db_555", - "table_555", - ], - capture_output=True, - ) - print("========== test_jdbc_endpoint.sh STDOUT =================") - print(process.stdout.decode()) - print("========== test_jdbc_endpoint.sh STDERR =================") - print(process.stderr.decode()) - logger.info(f"JDBC endpoint test returned with status {process.returncode}") - assert process.returncode == 0 - @pytest.mark.abort_on_fail async def test_remove_authentication(ops_test: OpsTest, test_pod, charm_versions): From f86fdc5a839f0bdf37d2cb2288ea8fa100133fb0 Mon Sep 17 00:00:00 2001 From: Bikalpa Dhakal Date: Fri, 23 Aug 2024 10:41:26 +0000 Subject: [PATCH 7/8] wip --- src/config/kyuubi.py | 4 ++-- src/constants.py | 1 + src/core/context.py | 4 ++-- src/events/actions.py | 2 +- 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/config/kyuubi.py b/src/config/kyuubi.py index bcddaba..6ed99b9 100644 --- a/src/config/kyuubi.py +++ b/src/config/kyuubi.py @@ -7,7 +7,7 @@ from typing import Optional -from constants import AUTHENTICATION_TABLE_NAME +from constants import AUTHENTICATION_TABLE_NAME, HA_ZNODE_NAME from core.domain import DatabaseConnectionInfo, ZookeeperInfo from utils.logging import WithLogging @@ -57,7 +57,7 @@ def _ha_conf(self) -> dict[str, str]: return {} return { "kyuubi.ha.addresses": self.zookeeper_info.uris, - "kyuubi.ha.namespace": self.zookeeper_info.database, + "kyuubi.ha.namespace": HA_ZNODE_NAME, "kyuubi.ha.zookeeper.auth.type": "DIGEST", "kyuubi.ha.zookeeper.auth.digest": self._get_zookeeper_auth_digest(), } diff --git a/src/constants.py b/src/constants.py index a7bfaa2..c27ac96 100644 --- a/src/constants.py +++ b/src/constants.py @@ -31,3 +31,4 @@ # Zookeeper literals HA_ZNODE_NAME = "/kyuubi" +HA_ZNODE_NAME_TEMP = "/kyuubi-temp" \ No newline at end of file diff --git a/src/core/context.py b/src/core/context.py index 6915f98..c2b8ef7 100644 --- a/src/core/context.py +++ b/src/core/context.py @@ -10,7 +10,7 @@ from common.relation.spark_sa import RequirerData from constants import ( AUTHENTICATION_DATABASE_NAME, - HA_ZNODE_NAME, + HA_ZNODE_NAME_TEMP, METASTORE_DATABASE_NAME, POSTGRESQL_AUTH_DB_REL, POSTGRESQL_METASTORE_DB_REL, @@ -43,7 +43,7 @@ def __init__(self, model: Model, config: ConfigData): extra_user_roles="superuser", ) self.zookeeper_requirer_data = DatabaseRequirerData( - self.model, ZOOKEEPER_REL, database_name=HA_ZNODE_NAME + self.model, ZOOKEEPER_REL, database_name=HA_ZNODE_NAME_TEMP ) @property diff --git a/src/events/actions.py b/src/events/actions.py index b789252..dbb83ba 100644 --- a/src/events/actions.py +++ b/src/events/actions.py @@ -45,7 +45,7 @@ def _on_get_jdbc_endpoint(self, event: ActionEvent): if self.context.is_ha_enabled(): address = self.context.zookeeper.uris - namespace = self.context.zookeeper.namespace + namespace = self.context.zookeeper.database if not address.endswith("/"): address += "/" endpoint = f"jdbc:hive2://{address};serviceDiscoveryMode=zooKeeper;zooKeeperNamespace={namespace}" From 88067ce4b7f211b37ecefaad84c6570785be50df Mon Sep 17 00:00:00 2001 From: Bikalpa Dhakal Date: Sat, 24 Aug 2024 04:44:46 +0000 Subject: [PATCH 8/8] Fix integration tests --- lib/charms/zookeeper/v0/client.py | 610 ------------------ src/config/kyuubi.py | 2 + src/constants.py | 2 +- src/core/context.py | 4 + src/core/domain.py | 1 + src/events/actions.py | 6 +- src/events/base.py | 3 + tests/integration/conftest.py | 22 +- .../setup/testpod_spec.yaml.template | 1 + tests/integration/test_charm.py | 141 +++- tests/integration/test_jdbc_endpoint.sh | 15 +- 11 files changed, 149 insertions(+), 658 deletions(-) delete mode 100644 lib/charms/zookeeper/v0/client.py diff --git a/lib/charms/zookeeper/v0/client.py b/lib/charms/zookeeper/v0/client.py deleted file mode 100644 index e6ce1c0..0000000 --- a/lib/charms/zookeeper/v0/client.py +++ /dev/null @@ -1,610 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2022 Canonical Ltd. -# See LICENSE file for licensing details. - -"""ZooKeeperManager and ZooKeeperClient classes - -`ZooKeeperManager` provides an interface for performing actions that requires -a connection to the current ZK quorum leader, e.g updating zNodes, ACLs and quorum members. -On `__init__`, it loops through all passed hosts, attempts a `ZooKeeperClient` connection, and -checks leadership of each unit, storing the current quorum leader host as an attribute. - -In most cases, custom `Exception`s raised by `ZooKeeperManager` should trigger an `event.defer()`, -as they indicate that the servers are not ready to have actions performed upon them just yet. - -`ZooKeeperClient` serves as a handler for managing a ZooKeeper client connection to a -single unit. It's methods contain common 4lw commands, and functionality to read/write -to specific zNodes. -It is not expected to use this class from directly from charm code, -but to instead use the `ZooKeeperManager` class to perform it's actions on the ZK servers. - - -Instances of `ZooKeeperManager` are to be created by methods in either the `Charm` itself, -or from another library. - -Example usage for `ZooKeeperManager`: - -```python - -def update_cluster(new_members: List[str], event: EventBase) -> None: - - try: - zk = ZooKeeperManager( - hosts=["10.141.73.20", "10.141.73.21"], - client_port=2181, - username="super", - password="password" - ) - - current_quorum_members = zk.server_members - - servers_to_remove = list(current_quorum_members - new_members) - zk.remove_members(servers_to_remove) - - servers_to_add = sorted(new_members - current_quorum_members) - zk.add_members(servers_to_add) - - except ( - MembersSyncingError, - MemberNotReadyError, - QuorumLeaderNotFoundError, - ) as e: - logger.info(str(e)) - event.defer() - return -``` -""" - -import logging -import re -from typing import Any, Dict, Iterable, List, Optional, Set, Tuple - -from kazoo.client import ACL, KazooClient -from kazoo.handlers.threading import KazooTimeoutError -from tenacity import RetryError, retry -from tenacity.retry import retry_if_not_result -from tenacity.stop import stop_after_attempt -from tenacity.wait import wait_fixed - -# The unique Charmhub library identifier, never change it -LIBID = "4dc4430e6e5d492699391f57bd697fce" - -# Increment this major API version when introducing breaking changes -LIBAPI = 0 - -# Increment this PATCH version before using `charmcraft publish-lib` or reset -# to 0 if you are raising the major API version -LIBPATCH = 6 - - -logger = logging.getLogger(__name__) - -# Kazoo logs are unbearably chatty -logging.getLogger("kazoo.client").disabled = True - - -class MembersSyncingError(Exception): - """Generic exception for when quorum members are syncing data.""" - - pass - - -class MemberNotReadyError(Exception): - """Generic exception for when a zk unit can't be connected to or is not broadcasting.""" - - pass - - -class QuorumLeaderNotFoundError(Exception): - """Generic exception for when there are no zk leaders in the app.""" - - pass - - -class ZooKeeperManager: - """Handler for performing ZK commands.""" - - def __init__( - self, - hosts: List[str], - username: str, - password: str, - client_port: int = 2181, - use_ssl: bool = False, - keyfile_path: Optional[str] = "", - keyfile_password: Optional[str] = "", - certfile_path: Optional[str] = "", - ): - self.hosts = hosts - self.username = username - self.password = password - self.client_port = client_port - self.use_ssl = use_ssl - self.keyfile_path = keyfile_path - self.keyfile_password = keyfile_password - self.certfile_path = certfile_path - self.leader = "" - - try: - self.leader = self.get_leader() - except RetryError: - raise QuorumLeaderNotFoundError("quorum leader not found") - - @retry( - wait=wait_fixed(3), - stop=stop_after_attempt(2), - retry=retry_if_not_result(lambda result: True if result else False), - ) - def get_leader(self) -> str: - """Attempts to find the current ZK quorum leader. - - In the case when there is a leadership election, this may fail. - When this happens, we attempt 1 retry after 3 seconds. - - Returns: - String of the host for the quorum leader - - Raises: - tenacity.RetryError: if the leader can't be found during the retry conditions - """ - leader = None - for host in self.hosts: - try: - with ZooKeeperClient( - host=host, - client_port=self.client_port, - username=self.username, - password=self.password, - use_ssl=self.use_ssl, - keyfile_path=self.keyfile_path, - keyfile_password=self.keyfile_password, - certfile_path=self.certfile_path, - ) as zk: - response = zk.srvr - if response.get("Mode") == "leader": - leader = host - break - except KazooTimeoutError: # in the case of having a dead unit in relation data - logger.debug(f"TIMEOUT - {host}") - continue - - return leader or "" - - @property - def server_members(self) -> Set[str]: - """The current members within the ZooKeeper quorum. - - Returns: - A set of ZK member strings - e.g {"server.1=10.141.78.207:2888:3888:participant;0.0.0.0:2181"} - """ - with ZooKeeperClient( - host=self.leader, - client_port=self.client_port, - username=self.username, - password=self.password, - use_ssl=self.use_ssl, - keyfile_path=self.keyfile_path, - keyfile_password=self.keyfile_password, - certfile_path=self.certfile_path, - ) as zk: - members, _ = zk.config - - return set(members) - - @property - def config_version(self) -> int: - """The current config version for ZooKeeper. - - Returns: - The zookeeper config version decoded from base16 - """ - with ZooKeeperClient( - host=self.leader, - client_port=self.client_port, - username=self.username, - password=self.password, - use_ssl=self.use_ssl, - keyfile_path=self.keyfile_path, - keyfile_password=self.keyfile_password, - certfile_path=self.certfile_path, - ) as zk: - _, version = zk.config - - return version - - @property - def members_syncing(self) -> bool: - """Flag to check if any quorum members are currently syncing data. - - Returns: - True if any members are syncing. Otherwise False. - """ - with ZooKeeperClient( - host=self.leader, - client_port=self.client_port, - username=self.username, - password=self.password, - use_ssl=self.use_ssl, - keyfile_path=self.keyfile_path, - keyfile_password=self.keyfile_password, - certfile_path=self.certfile_path, - ) as zk: - result = zk.mntr - - try: - zk_pending_syncs = result["zk_pending_syncs"] - except KeyError: # missing key, no quorum, no syncing - return False - - if ( - result.get("zk_peer_state", "") == "leading - broadcast" - and float(zk_pending_syncs) == 0 - ): - return False - - return True - - @property - def members_broadcasting(self) -> bool: - """Flag to check if any quorum members are currently broadcasting. - - Returns: - True if any members are currently broadcasting. Otherwise False. - """ - for host in self.hosts: - try: - with ZooKeeperClient( - host=host, - client_port=self.client_port, - username=self.username, - password=self.password, - use_ssl=self.use_ssl, - keyfile_path=self.keyfile_path, - keyfile_password=self.keyfile_password, - certfile_path=self.certfile_path, - ) as zk: - if not zk.is_ready: - return False - except KazooTimeoutError: # in the case of having a dead unit in relation data - logger.debug(f"TIMEOUT - {host}") - return False - - return True - - def add_members(self, members: Iterable[str]) -> None: - """Adds new members to the members' dynamic config. - - Raises: - MembersSyncingError: if any members are busy syncing data - MemberNotReadyError: if any members are not yet broadcasting - """ - if self.members_syncing: - raise MembersSyncingError("Unable to add members - some members are syncing") - - for member in members: - host = member.split("=")[1].split(":")[0] - - try: - # individual connections to each server - with ZooKeeperClient( - host=host, - client_port=self.client_port, - username=self.username, - password=self.password, - use_ssl=self.use_ssl, - keyfile_path=self.keyfile_path, - keyfile_password=self.keyfile_password, - certfile_path=self.certfile_path, - ) as zk: - if not zk.is_ready: - raise MemberNotReadyError(f"Server is not ready: {host}") - except KazooTimeoutError as e: # for when units are departing - logger.debug(str(e)) - continue - - # specific connection to leader - with ZooKeeperClient( - host=self.leader, - client_port=self.client_port, - username=self.username, - password=self.password, - use_ssl=self.use_ssl, - keyfile_path=self.keyfile_path, - keyfile_password=self.keyfile_password, - certfile_path=self.certfile_path, - ) as zk: - zk.client.reconfig( - joining=member, leaving=None, new_members=None, from_config=self.config_version - ) - - def remove_members(self, members: Iterable[str]) -> None: - """Removes members from the members' dynamic config. - - Raises: - MembersSyncingError: if any members are busy syncing data - """ - if self.members_syncing: - raise MembersSyncingError("Unable to remove members - some members are syncing") - - for member in members: - member_id = re.findall(r"server.([0-9]+)", member)[0] - with ZooKeeperClient( - host=self.leader, - client_port=self.client_port, - username=self.username, - password=self.password, - use_ssl=self.use_ssl, - keyfile_path=self.keyfile_path, - keyfile_password=self.keyfile_password, - certfile_path=self.certfile_path, - ) as zk: - zk.client.reconfig( - joining=None, - leaving=member_id, - new_members=None, - from_config=self.config_version, - ) - - def leader_znodes(self, path: str) -> Set[str]: - """Grabs all children zNodes for a path on the current quorum leader. - - Args: - path: the 'root' path to search from - - Returns: - Set of all nested child zNodes - """ - with ZooKeeperClient( - host=self.leader, - client_port=self.client_port, - username=self.username, - password=self.password, - use_ssl=self.use_ssl, - keyfile_path=self.keyfile_path, - keyfile_password=self.keyfile_password, - certfile_path=self.certfile_path, - ) as zk: - all_znode_children = zk.get_all_znode_children(path=path) - - return all_znode_children - - def create_znode_leader(self, path: str, acls: List[ACL]) -> None: - """Creates a new zNode on the current quorum leader with given ACLs. - - Args: - path: the zNode path to set - acls: the ACLs to be set on that path - """ - with ZooKeeperClient( - host=self.leader, - client_port=self.client_port, - username=self.username, - password=self.password, - use_ssl=self.use_ssl, - keyfile_path=self.keyfile_path, - keyfile_password=self.keyfile_password, - certfile_path=self.certfile_path, - ) as zk: - zk.create_znode(path=path, acls=acls) - - def set_acls_znode_leader(self, path: str, acls: List[ACL]) -> None: - """Updates ACLs for an existing zNode on the current quorum leader. - - Args: - path: the zNode path to update - acls: the new ACLs to be set on that path - """ - with ZooKeeperClient( - host=self.leader, - client_port=self.client_port, - username=self.username, - password=self.password, - use_ssl=self.use_ssl, - keyfile_path=self.keyfile_path, - keyfile_password=self.keyfile_password, - certfile_path=self.certfile_path, - ) as zk: - zk.set_acls(path=path, acls=acls) - - def delete_znode_leader(self, path: str) -> None: - """Deletes a zNode path from the current quorum leader. - - Args: - path: the zNode path to delete - """ - with ZooKeeperClient( - host=self.leader, - client_port=self.client_port, - username=self.username, - password=self.password, - use_ssl=self.use_ssl, - keyfile_path=self.keyfile_path, - keyfile_password=self.keyfile_password, - certfile_path=self.certfile_path, - ) as zk: - zk.delete_znode(path=path) - - def get_version(self) -> str: - """Get ZooKeeper service version from srvr 4lw. - - Returns: - String of ZooKeeper service version - """ - with ZooKeeperClient( - host=self.leader, - client_port=self.client_port, - username=self.username, - password=self.password, - use_ssl=self.use_ssl, - keyfile_path=self.keyfile_path, - keyfile_password=self.keyfile_password, - certfile_path=self.certfile_path, - ) as zk: - return zk.srvr["Zookeeper version"].split("-", maxsplit=1)[0] - - -class ZooKeeperClient: - """Handler for ZooKeeper connections and running 4lw client commands.""" - - def __init__( - self, - host: str, - client_port: int, - username: str, - password: str, - use_ssl: bool = False, - keyfile_path: Optional[str] = None, - keyfile_password: Optional[str] = None, - certfile_path: Optional[str] = None, - ): - self.host = host - self.client_port = client_port - self.username = username - self.password = password - self.client = KazooClient( - hosts=f"{host}:{client_port}", - timeout=1.0, - sasl_options={"mechanism": "DIGEST-MD5", "username": username, "password": password}, - keyfile=keyfile_path, - keyfile_password=keyfile_password, - certfile=certfile_path, - verify_certs=False, - use_ssl=use_ssl, - ) - self.client.start() - - def __enter__(self): - return self - - def __exit__(self, object_type, value, traceback): - self.client.stop() - - def _run_4lw_command(self, command: str): - return self.client.command(command.encode()) - - @property - def config(self) -> Tuple[List[str], int]: - """Retrieves the dynamic config for a ZooKeeper service. - - Returns: - Tuple of the decoded config list, and decoded config version - """ - response = self.client.get("/zookeeper/config") - if response: - result = str(response[0].decode("utf-8")).splitlines() - version = int(result.pop(-1).split("=")[1], base=16) - else: - raise - - return result, version - - @property - def srvr(self) -> Dict[str, Any]: - """Retrieves attributes returned from the 'srvr' 4lw command. - - Returns: - Mapping of field and setting returned from `srvr` - """ - response = self._run_4lw_command("srvr") - - result = {} - for item in response.splitlines(): - k = re.split(": ", item)[0] - v = re.split(": ", item)[1] - result[k] = v - - return result - - @property - def mntr(self) -> Dict[str, Any]: - """Retrieves attributes returned from the 'mntr' 4lw command. - - Returns: - Mapping of field and setting returned from `mntr` - """ - response = self._run_4lw_command("mntr") - - result = {} - for item in response.splitlines(): - if re.search("=|\\t", item): - k = re.split("=|\\t", item)[0] - v = re.split("=|\\t", item)[1] - result[k] = v - else: - result[item] = "" - - return result - - @property - def is_ready(self) -> bool: - """Flag to confirm connected ZooKeeper server is connected and broadcasting. - - Returns: - True if server is broadcasting. Otherwise False. - """ - if self.client.connected: - return "broadcast" in self.mntr.get("zk_peer_state", "") - return False - - def get_all_znode_children(self, path: str) -> Set[str]: - """Recursively gets all children for a given parent znode path. - - Args: - path: the desired parent znode path to recurse - - Returns: - Set of all nested children znode paths for the given parent - """ - children = self.client.get_children(path) or [] - - result = set() - for child in children: - if path + child != "/zookeeper": - result.update(self.get_all_znode_children(path.rstrip("/") + "/" + child)) - if path != "/": - result.add(path) - - return result - - def delete_znode(self, path: str) -> None: - """Drop znode and all it's children from ZK tree. - - Args: - path: the desired znode path to delete - """ - if not self.client.exists(path): - return - self.client.delete(path, recursive=True) - - def create_znode(self, path: str, acls: List[ACL]) -> None: - """Create new znode. - - Args: - path: the desired znode path to create - acls: the acls for the new znode - """ - self.client.create(path, acl=acls, makepath=True) - - def get_acls(self, path: str) -> List[ACL]: - """Gets acls for a desired znode path. - - Args: - path: the desired znode path - - Returns: - List of the acls set for the given znode - """ - acl_list = self.client.get_acls(path) - - return acl_list if acl_list else [] - - def set_acls(self, path: str, acls: List[ACL]) -> None: - """Sets acls for a desired znode path. - - Args: - path: the desired znode path - acls: the acls to set to the given znode - """ - self.client.set_acls(path, acls) - diff --git a/src/config/kyuubi.py b/src/config/kyuubi.py index 6ed99b9..ba9ce84 100644 --- a/src/config/kyuubi.py +++ b/src/config/kyuubi.py @@ -57,6 +57,8 @@ def _ha_conf(self) -> dict[str, str]: return {} return { "kyuubi.ha.addresses": self.zookeeper_info.uris, + # FIXME: Get this value from self.context.zookeeper.uris when znode created by + # zookeeper charm has enough permissions for Kyuubi to work "kyuubi.ha.namespace": HA_ZNODE_NAME, "kyuubi.ha.zookeeper.auth.type": "DIGEST", "kyuubi.ha.zookeeper.auth.digest": self._get_zookeeper_auth_digest(), diff --git a/src/constants.py b/src/constants.py index c27ac96..7985b55 100644 --- a/src/constants.py +++ b/src/constants.py @@ -31,4 +31,4 @@ # Zookeeper literals HA_ZNODE_NAME = "/kyuubi" -HA_ZNODE_NAME_TEMP = "/kyuubi-temp" \ No newline at end of file +HA_ZNODE_NAME_TEMP = "/kyuubi-temp" diff --git a/src/core/context.py b/src/core/context.py index c2b8ef7..84c9651 100644 --- a/src/core/context.py +++ b/src/core/context.py @@ -42,6 +42,10 @@ def __init__(self, model: Model, config: ConfigData): database_name=AUTHENTICATION_DATABASE_NAME, extra_user_roles="superuser", ) + + # FIXME: The database_name currently requested is a dummy name + # This should be replaced with the name of actual znode when znode created + # by zookeeper charm has enough permissions for Kyuubi to work self.zookeeper_requirer_data = DatabaseRequirerData( self.model, ZOOKEEPER_REL, database_name=HA_ZNODE_NAME_TEMP ) diff --git a/src/core/domain.py b/src/core/domain.py index 1324fe0..d026f91 100644 --- a/src/core/domain.py +++ b/src/core/domain.py @@ -34,6 +34,7 @@ class Status(Enum): MISSING_INTEGRATION_HUB = BlockedStatus("Missing integration hub relation") INVALID_NAMESPACE = BlockedStatus("Invalid config option: namespace") INVALID_SERVICE_ACCOUNT = BlockedStatus("Invalid config option: service-account") + WAITING_ZOOKEEPER = MaintenanceStatus("Waiting for zookeeper credentials") ACTIVE = ActiveStatus("") diff --git a/src/events/actions.py b/src/events/actions.py index dbb83ba..9304671 100644 --- a/src/events/actions.py +++ b/src/events/actions.py @@ -7,7 +7,7 @@ from ops import CharmBase from ops.charm import ActionEvent -from constants import DEFAULT_ADMIN_USERNAME, JDBC_PORT +from constants import DEFAULT_ADMIN_USERNAME, HA_ZNODE_NAME, JDBC_PORT from core.context import Context from core.domain import Status from core.workload import KyuubiWorkloadBase @@ -45,7 +45,9 @@ def _on_get_jdbc_endpoint(self, event: ActionEvent): if self.context.is_ha_enabled(): address = self.context.zookeeper.uris - namespace = self.context.zookeeper.database + # FIXME: Get this value from self.context.zookeeper.uris when znode created by + # zookeeper charm has enough permissions for Kyuubi to work + namespace = HA_ZNODE_NAME if not address.endswith("/"): address += "/" endpoint = f"jdbc:hive2://{address};serviceDiscoveryMode=zooKeeper;zooKeeperNamespace={namespace}" diff --git a/src/events/base.py b/src/events/base.py index a87507d..3ccd63e 100644 --- a/src/events/base.py +++ b/src/events/base.py @@ -59,6 +59,9 @@ def get_app_status( if not k8s_manager.is_service_account_valid(): return Status.INVALID_SERVICE_ACCOUNT.value + if self.context._zookeeper_relation and not self.context.zookeeper: + return Status.WAITING_ZOOKEEPER.value + return Status.ACTIVE.value diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index c3c93d4..6bafefe 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -95,6 +95,7 @@ def charm_versions() -> IntegrationTestsCharms: "channel": "3/edge", "series": "jammy", "alias": "zookeeper", + "num_units": 3, } ), ) @@ -150,14 +151,15 @@ def s3_bucket_and_creds(): @pytest.fixture(scope="module") -def test_pod(): +def test_pod(ops_test): logger.info("Preparing test pod fixture...") kyuubi_image = METADATA["resources"]["kyuubi-image"]["upstream-source"] + namespace = ops_test.model_name with open(TEST_POD_SPEC_FILE) as tf: template = Template(tf.read()) - pod_spec = template.substitute(kyuubi_image=kyuubi_image) + pod_spec = template.substitute(kyuubi_image=kyuubi_image, namespace=namespace) # Create test pod by applying pod spec apply_result = subprocess.run( @@ -169,7 +171,17 @@ def test_pod(): # Wait until the pod is in ready state wait_result = subprocess.run( - ["kubectl", "wait", "--for", "condition=Ready", f"pod/{pod_name}", "--timeout", "60s"] + [ + "kubectl", + "wait", + "--for", + "condition=Ready", + f"pod/{pod_name}", + "-n", + namespace, + "--timeout", + "60s", + ] ) assert wait_result.returncode == 0 @@ -178,5 +190,7 @@ def test_pod(): # Cleanup by deleting the pod that was creatd logger.info("Deleting test pod fixture...") - delete_result = subprocess.run(["kubectl", "delete", "pod", pod_name], check=True) + delete_result = subprocess.run( + ["kubectl", "delete", "pod", "-n", namespace, pod_name], check=True + ) assert delete_result.returncode == 0 diff --git a/tests/integration/setup/testpod_spec.yaml.template b/tests/integration/setup/testpod_spec.yaml.template index f802e8d..0e4a44d 100644 --- a/tests/integration/setup/testpod_spec.yaml.template +++ b/tests/integration/setup/testpod_spec.yaml.template @@ -5,6 +5,7 @@ apiVersion: v1 kind: Pod metadata: name: testpod + namespace: ${namespace} spec: containers: - image: ${kyuubi_image} diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index 6bce5a3..9a707c5 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -3,6 +3,7 @@ # See LICENSE file for licensing details. import logging +import re import subprocess import time import uuid @@ -18,6 +19,7 @@ from constants import ( AUTHENTICATION_DATABASE_NAME, + HA_ZNODE_NAME, KYUUBI_CLIENT_RELATION_NAME, METASTORE_DATABASE_NAME, ) @@ -209,6 +211,7 @@ async def test_jdbc_endpoint_with_default_metastore(ops_test: OpsTest, test_pod) [ "./tests/integration/test_jdbc_endpoint.sh", test_pod, + ops_test.model_name, jdbc_endpoint, "db_default_metastore", "table_default_metastore", @@ -270,6 +273,7 @@ async def test_jdbc_endpoint_with_postgres_metastore(ops_test: OpsTest, test_pod [ "./tests/integration/test_jdbc_endpoint.sh", test_pod, + ops_test.model_name, jdbc_endpoint, "db_postgres_metastore", "table_postgres_metastore", @@ -357,6 +361,7 @@ async def test_jdbc_endpoint_after_removing_postgresql_metastore( [ "./tests/integration/test_jdbc_endpoint.sh", test_pod, + ops_test.model_name, jdbc_endpoint, "db_default_metastore_2", "table_default_metastore_2", @@ -450,6 +455,7 @@ async def test_jdbc_endpoint_no_credentials(ops_test: OpsTest, test_pod): [ "./tests/integration/test_jdbc_endpoint.sh", test_pod, + ops_test.model_name, jdbc_endpoint, "db_111", "table_111", @@ -487,6 +493,7 @@ async def test_jdbc_endpoint_invalid_credentials(ops_test: OpsTest, test_pod): [ "./tests/integration/test_jdbc_endpoint.sh", test_pod, + ops_test.model_name, jdbc_endpoint, "db_222", "table_222", @@ -535,6 +542,7 @@ async def test_jdbc_endpoint_valid_credentials(ops_test: OpsTest, test_pod): [ "./tests/integration/test_jdbc_endpoint.sh", test_pod, + ops_test.model_name, jdbc_endpoint, "db_333", "table_333", @@ -596,6 +604,7 @@ async def test_set_password_action(ops_test: OpsTest, test_pod): [ "./tests/integration/test_jdbc_endpoint.sh", test_pod, + ops_test.model_name, jdbc_endpoint, "db_444", "table_444", @@ -706,6 +715,7 @@ async def test_kyuubi_client_relation_joined(ops_test: OpsTest, test_pod, charm_ [ "./tests/integration/test_jdbc_endpoint.sh", test_pod, + ops_test.model_name, jdbc_endpoint, "db_666", "tbl_666", @@ -805,6 +815,7 @@ async def test_kyuubi_client_relation_removed(ops_test: OpsTest, test_pod, charm [ "./tests/integration/test_jdbc_endpoint.sh", test_pod, + ops_test.model_name, jdbc_endpoint, "db_777", "tbl_777", @@ -823,7 +834,55 @@ async def test_kyuubi_client_relation_removed(ops_test: OpsTest, test_pod, charm @pytest.mark.abort_on_fail -async def test_integration_with_zookeeper(ops_test: OpsTest, charm_versions): +async def test_remove_authentication(ops_test: OpsTest, test_pod, charm_versions): + """Test the JDBC connection when authentication is disabled.""" + logger.info("Removing relation between postgresql-k8s and kyuubi-k8s over auth-db endpoint...") + await ops_test.model.applications[APP_NAME].remove_relation( + f"{APP_NAME}:auth-db", f"{charm_versions.postgres.application_name}:database" + ) + + logger.info("Waiting for postgresql-k8s and kyuubi-k8s apps to be idle and active...") + await ops_test.model.wait_for_idle( + apps=[APP_NAME, charm_versions.postgres.application_name], timeout=1000, status="active" + ) + + logger.info( + "Waiting for extra 30 seconds as cool-down period before proceeding with the test..." + ) + time.sleep(30) + + logger.info("Running action 'get-jdbc-endpoint' on kyuubi-k8s unit...") + kyuubi_unit = ops_test.model.applications[APP_NAME].units[0] + action = await kyuubi_unit.run_action( + action_name="get-jdbc-endpoint", + ) + result = await action.wait() + + jdbc_endpoint = result.results.get("endpoint") + logger.info(f"JDBC endpoint: {jdbc_endpoint}") + + logger.info("Testing JDBC endpoint by connecting with beeline with no credentials ...") + process = subprocess.run( + [ + "./tests/integration/test_jdbc_endpoint.sh", + test_pod, + ops_test.model_name, + jdbc_endpoint, + "db_555", + "table_555", + ], + capture_output=True, + ) + print("========== test_jdbc_endpoint.sh STDOUT =================") + print(process.stdout.decode()) + print("========== test_jdbc_endpoint.sh STDERR =================") + print(process.stderr.decode()) + logger.info(f"JDBC endpoint test returned with status {process.returncode}") + assert process.returncode == 0 + + +@pytest.mark.abort_on_fail +async def test_integration_with_zookeeper(ops_test: OpsTest, test_pod, charm_versions): """Test the charm by integrating it with Zookeeper.""" # Deploy the charm and wait for waiting status logger.info("Deploying zookeeper-k8s charm...") @@ -837,16 +896,46 @@ async def test_integration_with_zookeeper(ops_test: OpsTest, charm_versions): logger.info("Integrating kyuubi charm with zookeeper charm...") await ops_test.model.integrate(charm_versions.zookeeper.application_name, APP_NAME) - logger.info("Waiting for zookeeper-k8s and kyuubi charms to be idle...") + logger.info("Waiting for zookeeper-k8s and kyuubi charms to be idle idle...") await ops_test.model.wait_for_idle( - apps=[APP_NAME, charm_versions.s3.application_name], timeout=1000 + apps=[APP_NAME, charm_versions.s3.application_name], timeout=1000, status="active" ) - # Assert that both kyuubi-k8s and zookeeper-k8s charms are in active state - assert check_status(ops_test.model.applications[APP_NAME], Status.ACTIVE.value) - assert ( - ops_test.model.applications[charm_versions.zookeeper.application_name].status == "active" + logger.info("Running action 'get-jdbc-endpoint' on kyuubi-k8s unit...") + kyuubi_unit = ops_test.model.applications[APP_NAME].units[0] + action = await kyuubi_unit.run_action( + action_name="get-jdbc-endpoint", + ) + result = await action.wait() + + jdbc_endpoint = result.results.get("endpoint") + logger.info(f"JDBC endpoint: {jdbc_endpoint}") + + assert "serviceDiscoveryMode=zooKeeper" in jdbc_endpoint + assert f"zooKeeperNamespace={HA_ZNODE_NAME}" in jdbc_endpoint + assert re.match( + r"jdbc:hive2://(.*),(.*),(.*)/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=.*", + jdbc_endpoint, + ) + + logger.info("Testing JDBC endpoint by connecting with beeline with no credentials ...") + process = subprocess.run( + [ + "./tests/integration/test_jdbc_endpoint.sh", + test_pod, + ops_test.model_name, + jdbc_endpoint, + "db_999", + "table_999", + ], + capture_output=True, ) + print("========== test_jdbc_endpoint.sh STDOUT =================") + print(process.stdout.decode()) + print("========== test_jdbc_endpoint.sh STDERR =================") + print(process.stderr.decode()) + logger.info(f"JDBC endpoint test returned with status {process.returncode}") + assert process.returncode == 0 @pytest.mark.abort_on_fail @@ -862,31 +951,6 @@ async def test_remove_zookeeper_relation(ops_test: OpsTest, test_pod, charm_vers apps=[APP_NAME, charm_versions.zookeeper.application_name], timeout=1000, status="active" ) - # Assert that both kyuubi-k8s and zookeeper-k8s charms are in active state - assert check_status(ops_test.model.applications[APP_NAME], Status.ACTIVE.value) - assert ( - ops_test.model.applications[charm_versions.zookeeper.application_name].status == "active" - ) - - -@pytest.mark.abort_on_fail -async def test_remove_authentication(ops_test: OpsTest, test_pod, charm_versions): - """Test the JDBC connection when authentication is disabled.""" - logger.info("Removing relation between postgresql-k8s and kyuubi-k8s over auth-db endpoint...") - await ops_test.model.applications[APP_NAME].remove_relation( - f"{APP_NAME}:auth-db", f"{charm_versions.postgres.application_name}:database" - ) - - logger.info("Waiting for postgresql-k8s and kyuubi-k8s apps to be idle and active...") - await ops_test.model.wait_for_idle( - apps=[APP_NAME, charm_versions.postgres.application_name], timeout=1000, status="active" - ) - - logger.info( - "Waiting for extra 30 seconds as cool-down period before proceeding with the test..." - ) - time.sleep(30) - logger.info("Running action 'get-jdbc-endpoint' on kyuubi-k8s unit...") kyuubi_unit = ops_test.model.applications[APP_NAME].units[0] action = await kyuubi_unit.run_action( @@ -897,14 +961,22 @@ async def test_remove_authentication(ops_test: OpsTest, test_pod, charm_versions jdbc_endpoint = result.results.get("endpoint") logger.info(f"JDBC endpoint: {jdbc_endpoint}") + assert "serviceDiscoveryMode=zooKeeper" not in jdbc_endpoint + assert f"zooKeeperNamespace={HA_ZNODE_NAME}" not in jdbc_endpoint + assert not re.match( + r"jdbc:hive2://(.*),(.*),(.*)/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=.*", + jdbc_endpoint, + ) + logger.info("Testing JDBC endpoint by connecting with beeline with no credentials ...") process = subprocess.run( [ "./tests/integration/test_jdbc_endpoint.sh", test_pod, + ops_test.model_name, jdbc_endpoint, - "db_555", - "table_555", + "db_101010", + "table_101010", ], capture_output=True, ) @@ -971,6 +1043,7 @@ async def test_read_spark_properties_from_secrets(ops_test: OpsTest, test_pod): [ "./tests/integration/test_jdbc_endpoint.sh", test_pod, + ops_test.model_name, jdbc_endpoint, "db_888", "table_888", diff --git a/tests/integration/test_jdbc_endpoint.sh b/tests/integration/test_jdbc_endpoint.sh index 8e46bf8..d307ce0 100755 --- a/tests/integration/test_jdbc_endpoint.sh +++ b/tests/integration/test_jdbc_endpoint.sh @@ -3,21 +3,22 @@ # See LICENSE file for licensing details. POD_NAME=$1 -JDBC_ENDPOINT=$2 -DB_NAME=${3:-testdb} -TABLE_NAME=${4:-testtable} -USERNAME=${5:-} -PASSWORD=${6:-} +NAMESPACE=${2:-default} +JDBC_ENDPOINT=$3 +DB_NAME=${4:-testdb} +TABLE_NAME=${5:-testtable} +USERNAME=${6:-} +PASSWORD=${7:-} SQL_COMMANDS=$(cat ./tests/integration/setup/test.sql | sed "s/db_name/$DB_NAME/g" | sed "s/table_name/$TABLE_NAME/g") if [ -z "${USERNAME}" ]; then - echo -e "$(kubectl exec $POD_NAME -- \ + echo -e "$(kubectl exec $POD_NAME -n $NAMESPACE -- \ env CMDS="$SQL_COMMANDS" ENDPOINT="$JDBC_ENDPOINT" \ /bin/bash -c 'echo "$CMDS" | /opt/kyuubi/bin/beeline -u $ENDPOINT' )" > /tmp/test_beeline.out else - echo -e "$(kubectl exec $POD_NAME -- \ + echo -e "$(kubectl exec $POD_NAME -n $NAMESPACE -- \ env CMDS="$SQL_COMMANDS" ENDPOINT="$JDBC_ENDPOINT" USER="$USERNAME" PASSWD="$PASSWORD"\ /bin/bash -c 'echo "$CMDS" | /opt/kyuubi/bin/beeline -u $ENDPOINT -n $USER -p $PASSWD' )" > /tmp/test_beeline.out