Skip to content

Commit

Permalink
Implemented metadata permissions: property, closes #1636
Browse files Browse the repository at this point in the history
  • Loading branch information
simonw committed Dec 13, 2022
1 parent 8bf06a7 commit c5d30b5
Show file tree
Hide file tree
Showing 2 changed files with 195 additions and 11 deletions.
55 changes: 54 additions & 1 deletion datasette/default_permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,60 @@ async def inner():


async def _resolve_metadata_permissions_blocks(datasette, actor, action, resource):
# Check custom permissions: blocks - not yet implemented
# Check custom permissions: blocks
metadata = datasette.metadata()
root_block = (metadata.get("permissions", None) or {}).get(action)
if root_block:
root_result = actor_matches_allow(actor, root_block)
if root_result is not None:
return root_result
# Now try database-specific blocks
if not resource:
return None
if isinstance(resource, str):
database = resource
else:
database = resource[0]
database_block = (
(metadata.get("databases", {}).get(database, {}).get("permissions", None)) or {}
).get(action)
if database_block:
database_result = actor_matches_allow(actor, database_block)
if database_result is not None:
return database_result
# Finally try table/query specific blocks
if not isinstance(resource, tuple):
return None
database, table_or_query = resource
table_block = (
(
metadata.get("databases", {})
.get(database, {})
.get("tables", {})
.get(table_or_query, {})
.get("permissions", None)
)
or {}
).get(action)
if table_block:
table_result = actor_matches_allow(actor, table_block)
if table_result is not None:
return table_result
# Finally the canned queries
query_block = (
(
metadata.get("databases", {})
.get(database, {})
.get("queries", {})
.get(table_or_query, {})
.get("permissions", None)
)
or {}
).get(action)
if query_block:
query_result = actor_matches_allow(actor, query_block)
if query_result is not None:
return query_result
return None


Expand Down
151 changes: 141 additions & 10 deletions tests/test_permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from bs4 import BeautifulSoup as Soup
import copy
import json
from pprint import pprint
import pytest_asyncio
import pytest
import re
Expand Down Expand Up @@ -645,22 +646,20 @@ async def test_actor_restricted_permissions(

PermMetadataTestCase = collections.namedtuple(
"PermMetadataTestCase",
"metadata,actor,action,resource,default,expected_result",
"metadata,actor,action,resource,expected_result",
)


@pytest.mark.asyncio
@pytest.mark.xfail(reason="Not implemented yet")
@pytest.mark.parametrize(
"metadata,actor,action,resource,default,expected_result",
"metadata,actor,action,resource,expected_result",
(
# Simple view-instance default=True example
PermMetadataTestCase(
metadata={},
actor=None,
action="view-instance",
resource=None,
default=True,
expected_result=True,
),
# debug-menu on root
Expand All @@ -669,21 +668,153 @@ async def test_actor_restricted_permissions(
actor={"id": "user"},
action="debug-menu",
resource=None,
default=False,
expected_result=True,
),
# debug-menu on root, wrong actor
PermMetadataTestCase(
metadata={"permissions": {"debug-menu": {"id": "user"}}},
actor={"id": "user2"},
action="debug-menu",
resource=None,
expected_result=False,
),
# create-table on root
PermMetadataTestCase(
metadata={"permissions": {"create-table": {"id": "user"}}},
actor={"id": "user"},
action="create-table",
resource=None,
expected_result=True,
),
# create-table on database - no resource specified
PermMetadataTestCase(
metadata={
"databases": {"db1": {"permissions": {"create-table": {"id": "user"}}}}
},
actor={"id": "user"},
action="create-table",
resource=None,
expected_result=False,
),
# create-table on database
PermMetadataTestCase(
metadata={
"databases": {"db1": {"permissions": {"create-table": {"id": "user"}}}}
},
actor={"id": "user"},
action="create-table",
resource="db1",
expected_result=True,
),
# insert-row on root, wrong actor
PermMetadataTestCase(
metadata={"permissions": {"insert-row": {"id": "user"}}},
actor={"id": "user2"},
action="insert-row",
resource=("db1", "t1"),
expected_result=False,
),
# insert-row on root, right actor
PermMetadataTestCase(
metadata={"permissions": {"insert-row": {"id": "user"}}},
actor={"id": "user"},
action="insert-row",
resource=("db1", "t1"),
expected_result=True,
),
# insert-row on database
PermMetadataTestCase(
metadata={
"databases": {"db1": {"permissions": {"insert-row": {"id": "user"}}}}
},
actor={"id": "user"},
action="insert-row",
resource="db1",
expected_result=True,
),
# insert-row on table, wrong table
PermMetadataTestCase(
metadata={
"databases": {
"db1": {
"tables": {
"t1": {"permissions": {"insert-row": {"id": "user"}}}
}
}
}
},
actor={"id": "user"},
action="insert-row",
resource=("db1", "t2"),
expected_result=False,
),
# insert-row on table, right table
PermMetadataTestCase(
metadata={
"databases": {
"db1": {
"tables": {
"t1": {"permissions": {"insert-row": {"id": "user"}}}
}
}
}
},
actor={"id": "user"},
action="insert-row",
resource=("db1", "t1"),
expected_result=True,
),
# view-query on canned query, wrong actor
PermMetadataTestCase(
metadata={
"databases": {
"db1": {
"queries": {
"q1": {
"sql": "select 1 + 1",
"permissions": {"view-query": {"id": "user"}},
}
}
}
}
},
actor={"id": "user2"},
action="view-query",
resource=("db1", "q1"),
expected_result=False,
),
# view-query on canned query, right actor
PermMetadataTestCase(
metadata={
"databases": {
"db1": {
"queries": {
"q1": {
"sql": "select 1 + 1",
"permissions": {"view-query": {"id": "user"}},
}
}
}
}
},
actor={"id": "user"},
action="view-query",
resource=("db1", "q1"),
expected_result=True,
),
),
)
async def test_permissions_in_metadata(
perms_ds, metadata, actor, action, resource, default, expected_result
perms_ds, metadata, actor, action, resource, expected_result
):
previous_metadata = perms_ds.metadata()
updated_metadata = copy.deepcopy(previous_metadata)
updated_metadata.update(metadata)
perms_ds._metadata_local = updated_metadata
try:
result = await perms_ds.permission_allowed(
actor, action, resource, default=default
)
assert result == expected_result
result = await perms_ds.permission_allowed(actor, action, resource)
if result != expected_result:
pprint(perms_ds._permission_checks)
assert result == expected_result
finally:
perms_ds._metadata_local = previous_metadata

0 comments on commit c5d30b5

Please sign in to comment.