Skip to content

Commit

Permalink
Updates to use new storage.Path type
Browse files Browse the repository at this point in the history
These changes refactor the storage layer to use storage.Path instead of ast.Ref
for Read/Write/Begin/Unmount/Mount operations.

Previously, the storage layer used ast.Ref values to refer to locations in
storage. Use of ast.Ref introduced unnecessary complexity for storage plugins
as they had to be aware of various details (e.g., array indices specified as
ast.Number/float64 values, potentially nested references, etc.) that were
unnecessary given that ast.Ref values passed to the storage layer were
intended represent JSON pointers.

These changes also remove the need for storage plugins to be aware of where
they are mounted. That is, the paths passed to the read call will be relative
to the mount point.

The remaining dependencies on the ast package from the storage package are for
(1) indexing and (2) policy storage. It may be possible to further decouple
these packages by revisiting how indexing is done and treating policies as
blobs.

Fixes #159
  • Loading branch information
tsandall committed Dec 2, 2016
1 parent 75cee19 commit 26cd8e5
Show file tree
Hide file tree
Showing 18 changed files with 488 additions and 553 deletions.
10 changes: 7 additions & 3 deletions repl/repl.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/open-policy-agent/opa/topdown"
"github.com/open-policy-agent/opa/topdown/explain"
"github.com/open-policy-agent/opa/version"

"github.com/peterh/liner"
)

Expand Down Expand Up @@ -1081,7 +1080,7 @@ func singleValue(body ast.Body) bool {
}

func dumpStorage(store *storage.Storage, txn storage.Transaction, w io.Writer) error {
data, err := store.Read(txn, ast.Ref{ast.DefaultRootDocument})
data, err := store.Read(txn, storage.Path{})
if err != nil {
return err
}
Expand All @@ -1105,8 +1104,13 @@ func mangleEvent(store *storage.Storage, txn storage.Transaction, event *topdown
var err error
event.Locals.Iter(func(k, v ast.Value) bool {
if r, ok := v.(ast.Ref); ok {
var path storage.Path
path, err = storage.NewPathForRef(r)
if err != nil {
return true
}
var doc interface{}
doc, err = store.Read(txn, r)
doc, err = store.Read(txn, path)
if err != nil {
return true
}
Expand Down
6 changes: 2 additions & 4 deletions runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,7 @@ func (rt *Runtime) init(params *Params) error {

defer store.Close(txn)

ref := ast.Ref{ast.DefaultRootDocument}
if err := store.Write(txn, storage.AddOp, ref, loaded.Documents); err != nil {
if err := store.Write(txn, storage.AddOp, storage.Path{}, loaded.Documents); err != nil {
return errors.Wrapf(err, "storage error")
}

Expand Down Expand Up @@ -240,8 +239,7 @@ func (rt *Runtime) processWatcherUpdate(paths []string) error {

defer rt.Store.Close(txn)

ref := ast.Ref{ast.DefaultRootDocument}
if err := rt.Store.Write(txn, storage.AddOp, ref, loaded.Documents); err != nil {
if err := rt.Store.Write(txn, storage.AddOp, storage.Path{}, loaded.Documents); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion runtime/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestInit(t *testing.T) {

txn := storage.NewTransactionOrDie(rt.Store)

node, err := rt.Store.Read(txn, ast.MustParseRef("data.foo"))
node, err := rt.Store.Read(txn, storage.MustParsePath("/foo"))
if util.Compare(node, "bar") != 0 || err != nil {
t.Errorf("Expected %v but got %v (err: %v)", "bar", node, err)
return
Expand Down
129 changes: 94 additions & 35 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ const compileQueryErrMsg = "error(s) occurred while compiling query, see Errors"
// attempts to modify a virtual document or create a document at a path that
// conflicts with an existing document.
type WriteConflictError struct {
path ast.Ref
path storage.Path
}

func (err WriteConflictError) Error() string {
Expand All @@ -72,6 +72,26 @@ func IsWriteConflict(err error) bool {
return ok
}

type badRequestError string

// isBadRequest reqturns true if the error indicates a badly formatted request.
func isBadRequest(err error) bool {
_, ok := err.(badRequestError)
return ok
}

func (err badRequestError) Error() string {
return string(err)
}

func badPatchOperationError(op string) badRequestError {
return badRequestError(fmt.Sprintf("bad patch operation: %v", op))
}

func badPatchPathError(path string) badRequestError {
return badRequestError(fmt.Sprintf("bad patch path: %v", path))
}

// patchV1 models a single patch operation against a document.
type patchV1 struct {
Op string `json:"op"`
Expand Down Expand Up @@ -246,6 +266,12 @@ func newBindingsV1(locals *ast.ValueMap) (result []*bindingV1) {
return result
}

type patchImpl struct {
path storage.Path
op storage.PatchOp
value interface{}
}

// Server represents an instance of OPA running in server mode.
type Server struct {
Handler http.Handler
Expand Down Expand Up @@ -504,7 +530,6 @@ func (s *Server) v1DataGet(w http.ResponseWriter, r *http.Request) {

func (s *Server) v1DataPatch(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
root := stringPathToDataRef(vars["path"])

ops := []patchV1{}
if err := json.NewDecoder(r.Body).Decode(&ops); err != nil {
Expand All @@ -520,32 +545,14 @@ func (s *Server) v1DataPatch(w http.ResponseWriter, r *http.Request) {

defer s.store.Close(txn)

for i := range ops {

var op storage.PatchOp

// TODO this could be refactored for failure handling
switch ops[i].Op {
case "add":
op = storage.AddOp
case "remove":
op = storage.RemoveOp
case "replace":
op = storage.ReplaceOp
default:
handleErrorf(w, 400, "bad patch operation: %v", ops[i].Op)
return
}

path := root
path = append(path, stringPathToRef(ops[i].Path)...)

if err := s.writeConflict(op, path); err != nil {
handleErrorAuto(w, err)
return
}
patches, err := s.prepareV1PatchSlice(vars["path"], ops)
if err != nil {
handleErrorAuto(w, err)
return
}

if err := s.store.Write(txn, op, path, ops[i].Value); err != nil {
for _, patch := range patches {
if err := s.store.Write(txn, patch.op, patch.path, patch.value); err != nil {
handleErrorAuto(w, err)
return
}
Expand All @@ -571,10 +578,11 @@ func (s *Server) v1DataPut(w http.ResponseWriter, r *http.Request) {

defer s.store.Close(txn)

// The path route variable contains the path portion *after* /v1/data so we
// prepend the global root document here.
path := ast.Ref{ast.DefaultRootDocument}
path = append(path, stringPathToRef(vars["path"])...)
path, ok := storage.ParsePath("/" + strings.Trim(vars["path"], "/"))
if !ok {
handleErrorf(w, 400, "bad path format %v", vars["path"])
return
}

_, err = s.store.Read(txn, path)

Expand Down Expand Up @@ -825,7 +833,7 @@ func (s *Server) setCompiler(compiler *ast.Compiler) {
s.compiler = compiler
}

func (s *Server) makeDir(txn storage.Transaction, path ast.Ref) error {
func (s *Server) makeDir(txn storage.Transaction, path storage.Path) error {

node, err := s.store.Read(txn, path)
if err == nil {
Expand All @@ -850,14 +858,61 @@ func (s *Server) makeDir(txn storage.Transaction, path ast.Ref) error {
return s.store.Write(txn, storage.AddOp, path, map[string]interface{}{})
}

func (s *Server) prepareV1PatchSlice(root string, ops []patchV1) (result []patchImpl, err error) {

root = "/" + strings.Trim(root, "/")

for _, op := range ops {
impl := patchImpl{
value: op.Value,
}

// Map patch operation.
switch op.Op {
case "add":
impl.op = storage.AddOp
case "remove":
impl.op = storage.RemoveOp
case "replace":
impl.op = storage.ReplaceOp
default:
return nil, badPatchOperationError(op.Op)
}

// Construct patch path.
path := strings.Trim(op.Path, "/")
if len(path) > 0 {
path = root + "/" + path
} else {
path = root
}

var ok bool
impl.path, ok = storage.ParsePath(path)
if !ok {
return nil, badPatchPathError(op.Path)
}

if err := s.writeConflict(impl.op, impl.path); err != nil {
return nil, err
}

result = append(result, impl)
}

return result, nil
}

// TODO(tsandall): this ought to be enforced by the storage layer.
func (s *Server) writeConflict(op storage.PatchOp, path ast.Ref) error {
func (s *Server) writeConflict(op storage.PatchOp, path storage.Path) error {

if op == storage.AddOp && path[len(path)-1].Value.Equal(ast.String("-")) {
if op == storage.AddOp && len(path) > 0 && path[len(path)-1] == "-" {
path = path[:len(path)-1]
}

if rs := s.Compiler().GetRulesForVirtualDocument(path); rs != nil {
ref := path.Ref(ast.DefaultRootDocument)

if rs := s.Compiler().GetRulesForVirtualDocument(ref); rs != nil {
return WriteConflictError{path}
}

Expand Down Expand Up @@ -908,6 +963,10 @@ func handleErrorAuto(w http.ResponseWriter, err error) {
handleError(w, 404, err)
return
}
if isBadRequest(curr) {
handleError(w, http.StatusBadRequest, err)
return
}
if storage.IsInvalidPatch(curr) {
handleError(w, 400, err)
return
Expand Down
12 changes: 6 additions & 6 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,14 @@ func TestDataV1(t *testing.T) {
tr{"PUT", "/data/a/b", `[1,2,3,4]`, 204, ""},
tr{"PUT", "/data/a/b/c/d", "0", 404, `{
"Code": 404,
"Message": "write conflict: data.a.b"
"Message": "write conflict: /a/b"
}`},
}},
{"put virtual write conflict", []tr{
tr{"PUT", "/policies/test", testMod2, 200, ""},
tr{"PUT", "/data/testmod/q/x", "0", 404, `{
"Code": 404,
"Message": "write conflict: data.testmod.q"
"Message": "write conflict: /testmod/q"
}`},
}},
{"get virtual", []tr{
Expand All @@ -161,7 +161,7 @@ func TestDataV1(t *testing.T) {
tr{"PUT", "/policies/test", testMod1, 200, ""},
tr{"PATCH", "/data/testmod/p", `[{"op": "add", "path": "-", "value": 1}]`, 404, `{
"Code": 404,
"Message": "write conflict: data.testmod.p"
"Message": "write conflict: /testmod/p"
}`},
}},
{"get with global", []tr{
Expand Down Expand Up @@ -665,7 +665,7 @@ func (queryBindingErrStore) ID() string {
return "mock"
}

func (s *queryBindingErrStore) Read(txn storage.Transaction, ref ast.Ref) (interface{}, error) {
func (s *queryBindingErrStore) Read(txn storage.Transaction, path storage.Path) (interface{}, error) {
// At this time, the store will receive two reads:
// - The first during evaluation
// - The second when the server tries to accumulate the bindings
Expand All @@ -676,7 +676,7 @@ func (s *queryBindingErrStore) Read(txn storage.Transaction, ref ast.Ref) (inter
return "", nil
}

func (queryBindingErrStore) Begin(txn storage.Transaction, refs []ast.Ref) error {
func (queryBindingErrStore) Begin(txn storage.Transaction, params storage.TransactionParams) error {
return nil
}

Expand All @@ -689,7 +689,7 @@ func TestQueryBindingIterationError(t *testing.T) {
store := storage.New(storage.InMemoryConfig())
mock := &queryBindingErrStore{}

if err := store.Mount(mock, ast.MustParseRef("data.foo.bar")); err != nil {
if err := store.Mount(mock, storage.MustParsePath("/foo/bar")); err != nil {
panic(err)
}

Expand Down
Loading

0 comments on commit 26cd8e5

Please sign in to comment.