Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(key-value): use flush instead of commit #29286

Merged
merged 17 commits into from
Jun 20, 2024
Merged
2 changes: 2 additions & 0 deletions superset/commands/dashboard/permalink/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from sqlalchemy.exc import SQLAlchemyError

from superset import db
from superset.commands.dashboard.permalink.base import BaseDashboardPermalinkCommand
from superset.commands.key_value.upsert import UpsertKeyValueCommand
from superset.daos.dashboard import DashboardDAO
Expand Down Expand Up @@ -62,6 +63,7 @@
codec=self.codec,
).run()
assert key.id # for type checks
db.session.commit()

Check warning on line 66 in superset/commands/dashboard/permalink/create.py

View check run for this annotation

Codecov / codecov/patch

superset/commands/dashboard/permalink/create.py#L66

Added line #L66 was not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the UpsertKeyValueCommand not commit? Note that hopefully (if merged) that #24969 should remove the need to have to commit in various places given it violates the "unit of work" philosophy.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I'm proposing to not commit in these commands, as we have cases where we want to chain multiple commands together. But if we consider each Key Value command a unit of work, then we should naturally commit there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think there's a case to be made for both. Maybe we can have default behavior be to commit but provide an option to skip for use cases where there's a command chaining multiple sub-commands? It would be inconsistent for some commands to commit while others requiring the caller to commit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this has previously been considered an antipattern (=having commit: bool flag in these types of methods or similar), I'm personally also kind of leaning in that direction. This would make it possible to use all existing commands both as full units of work, or as part of a bigger chain, with minimal code duplication. Thoughts @john-bodley ? Also pinging @michael-s-molina as you've worked on these components in the past.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SIP-99B will handle the chaining of commands (if necessary) via nested sessions where only the outermost session commits.

I think this logic is fine for now, though @villebro some of it maybe updated if/when I get my SIP-99B through.

Copy link
Member

@michael-s-molina michael-s-molina Jun 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would make it possible to use all existing commands both as full units of work, or as part of a bigger chain, with minimal code duplication

We'll achieve that with nested transactions using begin_nested. Check #24969 for reference.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll achieve that with nested transactions using being_nested. Check #24969 for reference.

@michael-s-molina I read up on the docs I could find, and I agree, this should be an elegant solution to this dual use case.

@john-bodley it would be great if you could add a description to #24969 so we can start reviewing/testing it. I'm keen on getting this important refactor in, as it will have a profound impact on the general quality and perormance of the backend.

return encode_permalink_key(key=key.id, salt=self.salt)
except KeyValueCodecEncodeException as ex:
raise DashboardPermalinkCreateFailedError(str(ex)) from ex
Expand Down
2 changes: 2 additions & 0 deletions superset/commands/explore/permalink/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from sqlalchemy.exc import SQLAlchemyError

from superset import db
from superset.commands.explore.permalink.base import BaseExplorePermalinkCommand
from superset.commands.key_value.create import CreateKeyValueCommand
from superset.explore.permalink.exceptions import ExplorePermalinkCreateFailedError
Expand Down Expand Up @@ -58,6 +59,7 @@
key = command.run()
if key.id is None:
raise ExplorePermalinkCreateFailedError("Unexpected missing key id")
db.session.commit()

Check warning on line 62 in superset/commands/explore/permalink/create.py

View check run for this annotation

Codecov / codecov/patch

superset/commands/explore/permalink/create.py#L62

Added line #L62 was not covered by tests
return encode_permalink_key(key=key.id, salt=self.salt)
except KeyValueCodecEncodeException as ex:
raise ExplorePermalinkCreateFailedError(str(ex)) from ex
Expand Down
2 changes: 1 addition & 1 deletion superset/commands/key_value/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,5 +99,5 @@ def create(self) -> Key:
except ValueError as ex:
raise KeyValueCreateFailedError() from ex
db.session.add(entry)
db.session.commit()
db.session.flush()
return Key(id=entry.id, uuid=entry.uuid)
2 changes: 1 addition & 1 deletion superset/commands/key_value/delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,6 @@ def delete(self) -> bool:
filter_ = get_filter(self.resource, self.key)
if entry := db.session.query(KeyValueEntry).filter_by(**filter_).first():
db.session.delete(entry)
db.session.commit()
db.session.flush()
return True
return False
2 changes: 1 addition & 1 deletion superset/commands/key_value/delete_expired.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,4 @@ def delete_expired(self) -> None:
)
.delete()
)
db.session.commit()
db.session.flush()
3 changes: 1 addition & 2 deletions superset/commands/key_value/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
# under the License.

import logging
from datetime import datetime
from typing import Any, Optional, Union
from uuid import UUID

Expand Down Expand Up @@ -67,6 +66,6 @@
def get(self) -> Optional[Any]:
filter_ = get_filter(self.resource, self.key)
entry = db.session.query(KeyValueEntry).filter_by(**filter_).first()
if entry and (entry.expires_on is None or entry.expires_on > datetime.now()):
if entry and not entry.is_expired():

Check warning on line 69 in superset/commands/key_value/get.py

View check run for this annotation

Codecov / codecov/patch

superset/commands/key_value/get.py#L69

Added line #L69 was not covered by tests
return self.codec.decode(entry.value)
return None
2 changes: 1 addition & 1 deletion superset/commands/key_value/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
entry.expires_on = self.expires_on
entry.changed_on = datetime.now()
entry.changed_by_fk = get_user_id()
db.session.commit()
db.session.flush()

Check warning on line 87 in superset/commands/key_value/update.py

View check run for this annotation

Codecov / codecov/patch

superset/commands/key_value/update.py#L87

Added line #L87 was not covered by tests
return Key(id=entry.id, uuid=entry.uuid)

return None
2 changes: 1 addition & 1 deletion superset/commands/key_value/upsert.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
entry.expires_on = self.expires_on
entry.changed_on = datetime.now()
entry.changed_by_fk = get_user_id()
db.session.commit()
db.session.flush()

Check warning on line 91 in superset/commands/key_value/upsert.py

View check run for this annotation

Codecov / codecov/patch

superset/commands/key_value/upsert.py#L91

Added line #L91 was not covered by tests
return Key(entry.id, entry.uuid)

return CreateKeyValueCommand(
Expand Down
9 changes: 7 additions & 2 deletions superset/extensions/metastore_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from flask import current_app, Flask, has_app_context
from flask_caching import BaseCache

from superset import db
from superset.key_value.exceptions import KeyValueCreateFailedError
from superset.key_value.types import (
KeyValueCodec,
Expand Down Expand Up @@ -94,21 +95,23 @@
codec=self.codec,
expires_on=self._get_expiry(timeout),
).run()
db.session.commit()

Check warning on line 98 in superset/extensions/metastore_cache.py

View check run for this annotation

Codecov / codecov/patch

superset/extensions/metastore_cache.py#L98

Added line #L98 was not covered by tests
return True

def add(self, key: str, value: Any, timeout: Optional[int] = None) -> bool:
# pylint: disable=import-outside-toplevel
from superset.commands.key_value.create import CreateKeyValueCommand

try:
self._prune()

Check warning on line 106 in superset/extensions/metastore_cache.py

View check run for this annotation

Codecov / codecov/patch

superset/extensions/metastore_cache.py#L106

Added line #L106 was not covered by tests
CreateKeyValueCommand(
resource=RESOURCE,
value=value,
codec=self.codec,
key=self.get_key(key),
expires_on=self._get_expiry(timeout),
).run()
self._prune()
db.session.commit()

Check warning on line 114 in superset/extensions/metastore_cache.py

View check run for this annotation

Codecov / codecov/patch

superset/extensions/metastore_cache.py#L114

Added line #L114 was not covered by tests
return True
except KeyValueCreateFailedError:
return False
Expand All @@ -133,4 +136,6 @@
# pylint: disable=import-outside-toplevel
from superset.commands.key_value.delete import DeleteKeyValueCommand

return DeleteKeyValueCommand(resource=RESOURCE, key=self.get_key(key)).run()
ret = DeleteKeyValueCommand(resource=RESOURCE, key=self.get_key(key)).run()
db.session.commit()
return ret

Check warning on line 141 in superset/extensions/metastore_cache.py

View check run for this annotation

Codecov / codecov/patch

superset/extensions/metastore_cache.py#L139-L141

Added lines #L139 - L141 were not covered by tests
5 changes: 5 additions & 0 deletions superset/key_value/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from datetime import datetime

from flask_appbuilder import Model
from sqlalchemy import Column, DateTime, ForeignKey, Integer, LargeBinary, String
from sqlalchemy.orm import relationship
Expand All @@ -38,3 +40,6 @@ class KeyValueEntry(AuditMixinNullable, ImportExportMixin, Model):
changed_by_fk = Column(Integer, ForeignKey("ab_user.id"), nullable=True)
created_by = relationship(security_manager.user_model, foreign_keys=[created_by_fk])
changed_by = relationship(security_manager.user_model, foreign_keys=[changed_by_fk])

def is_expired(self) -> bool:
return self.expires_on is not None and self.expires_on <= datetime.now()
2 changes: 2 additions & 0 deletions superset/key_value/shared_entries.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from typing import Any, Optional
from uuid import uuid3

from superset import db
from superset.key_value.types import JsonKeyValueCodec, KeyValueResource, SharedKey
from superset.key_value.utils import get_uuid_namespace, random_key

Expand Down Expand Up @@ -45,6 +46,7 @@
key=uuid_key,
codec=CODEC,
).run()
db.session.commit()

Check warning on line 49 in superset/key_value/shared_entries.py

View check run for this annotation

Codecov / codecov/patch

superset/key_value/shared_entries.py#L49

Added line #L49 was not covered by tests


def get_permalink_salt(key: SharedKey) -> str:
Expand Down
13 changes: 10 additions & 3 deletions superset/utils/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
from datetime import datetime, timedelta
from typing import Any, cast, TypeVar, Union

from superset import db
from superset.exceptions import CreateKeyValueDistributedLockFailedException
from superset.key_value.exceptions import KeyValueCreateFailedError
from superset.key_value.types import KeyValueResource, PickleKeyValueCodec
from superset.key_value.types import JsonKeyValueCodec, KeyValueResource
from superset.utils import json

LOCK_EXPIRATION = timedelta(seconds=30)
Expand All @@ -53,6 +54,10 @@ def sort(obj: T) -> T:
return json.dumps(params)


def get_key(namespace: str, **kwargs: Any) -> uuid.UUID:
return uuid.uuid5(uuid.uuid5(uuid.NAMESPACE_DNS, namespace), serialize(kwargs))


@contextmanager
def KeyValueDistributedLock( # pylint: disable=invalid-name
namespace: str,
Expand All @@ -77,21 +82,23 @@ def KeyValueDistributedLock( # pylint: disable=invalid-name
from superset.commands.key_value.delete import DeleteKeyValueCommand
from superset.commands.key_value.delete_expired import DeleteExpiredKeyValueCommand

key = uuid.uuid5(uuid.uuid5(uuid.NAMESPACE_DNS, namespace), serialize(kwargs))
key = get_key(namespace, **kwargs)
logger.debug("Acquiring lock on namespace %s for key %s", namespace, key)
try:
DeleteExpiredKeyValueCommand(resource=KeyValueResource.LOCK).run()
CreateKeyValueCommand(
resource=KeyValueResource.LOCK,
codec=PickleKeyValueCodec(),
codec=JsonKeyValueCodec(),
villebro marked this conversation as resolved.
Show resolved Hide resolved
key=key,
value=True,
expires_on=datetime.now() + LOCK_EXPIRATION,
).run()
db.session.commit()

yield key

DeleteKeyValueCommand(resource=KeyValueResource.LOCK, key=key).run()
db.session.commit()
logger.debug("Removed lock on namespace %s for key %s", namespace, key)
except KeyValueCreateFailedError as ex:
raise CreateKeyValueDistributedLockFailedException(
Expand Down
9 changes: 9 additions & 0 deletions tests/integration_tests/extensions/metastore_cache_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,24 @@ def test_caching_flow(app_context: AppContext, cache: SupersetMetastoreCache) ->
def test_expiry(app_context: AppContext, cache: SupersetMetastoreCache) -> None:
delta = timedelta(days=90)
dttm = datetime(2022, 3, 18, 0, 0, 0)

# 1. initialize cached values, ensure they're found
with freeze_time(dttm):
cache.set(FIRST_KEY, FIRST_KEY_INITIAL_VALUE, int(delta.total_seconds()))
assert cache.get(FIRST_KEY) == FIRST_KEY_INITIAL_VALUE

# 2. ensure cached values are available a moment before expiration
with freeze_time(dttm + delta - timedelta(seconds=1)):
assert cache.has(FIRST_KEY)
assert cache.get(FIRST_KEY) == FIRST_KEY_INITIAL_VALUE

# 3. ensure cached entries expire
with freeze_time(dttm + delta + timedelta(seconds=1)):
assert cache.has(FIRST_KEY) is False
assert cache.get(FIRST_KEY) is None
# adding a value with the same key as an expired entry works
cache.add(FIRST_KEY, SECOND_VALUE, int(delta.total_seconds()))
assert cache.get(FIRST_KEY) == SECOND_VALUE


@pytest.mark.parametrize(
Expand Down
5 changes: 3 additions & 2 deletions tests/integration_tests/key_value/commands/delete_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def key_value_entry() -> KeyValueEntry:
value=bytes(json.dumps(JSON_VALUE), encoding="utf-8"),
)
db.session.add(entry)
db.session.commit()
db.session.flush()
return entry


Expand All @@ -61,6 +61,7 @@ def test_delete_id_entry(
from superset.commands.key_value.delete import DeleteKeyValueCommand

assert DeleteKeyValueCommand(resource=RESOURCE, key=ID_KEY).run() is True
db.session.commit()


def test_delete_uuid_entry(
Expand All @@ -71,12 +72,12 @@ def test_delete_uuid_entry(
from superset.commands.key_value.delete import DeleteKeyValueCommand

assert DeleteKeyValueCommand(resource=RESOURCE, key=UUID_KEY).run() is True
db.session.commit()


def test_delete_entry_missing(
app_context: AppContext,
admin: User, # noqa: F811
key_value_entry: KeyValueEntry,
) -> None:
from superset.commands.key_value.delete import DeleteKeyValueCommand

Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/key_value/commands/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def key_value_entry() -> Generator[KeyValueEntry, None, None]:
value=bytes(json.dumps(JSON_VALUE), encoding="utf-8"),
)
db.session.add(entry)
db.session.commit()
db.session.flush()
yield entry
db.session.delete(entry)
db.session.commit()
Expand Down
4 changes: 2 additions & 2 deletions tests/integration_tests/key_value/commands/get_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def test_get_expired_entry(app_context: AppContext) -> None:
expires_on=datetime.now() - timedelta(days=1),
)
db.session.add(entry)
db.session.commit()
db.session.flush()
value = GetKeyValueCommand(resource=RESOURCE, key=ID_KEY, codec=JSON_CODEC).run()
assert value is None
db.session.delete(entry)
Expand All @@ -96,7 +96,7 @@ def test_get_future_expiring_entry(app_context: AppContext) -> None:
expires_on=datetime.now() + timedelta(days=1),
)
db.session.add(entry)
db.session.commit()
db.session.flush()
value = GetKeyValueCommand(resource=RESOURCE, key=id_, codec=JSON_CODEC).run()
assert value == JSON_VALUE
db.session.delete(entry)
Expand Down
1 change: 0 additions & 1 deletion tests/integration_tests/key_value/commands/upsert_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,3 @@ def test_upsert_missing_entry(app_context: AppContext, admin: User) -> None: #
assert key is not None
assert key.id == 456
db.session.query(KeyValueEntry).filter_by(id=456).delete()
db.session.commit()
Loading
Loading