Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist checkpoints in evaluation history db. Try 2 #4083

Merged
merged 1 commit into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions database/query/eval_history.sql
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ RETURNING id;
INSERT INTO evaluation_statuses(
rule_entity_id,
status,
details
details,
checkpoint
) VALUES (
$1,
$2,
$3
$3,
sqlc.arg(checkpoint)::jsonb
)
RETURNING id;

Expand Down
14 changes: 11 additions & 3 deletions internal/db/eval_history.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions internal/engine/eval_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package engine
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"

Expand Down Expand Up @@ -142,6 +143,12 @@ func (e *executor) createOrUpdateEvalStatus(
alertStatus := evalerrors.ErrorAsAlertStatus(params.GetActionsErr().AlertErr)
e.metrics.CountAlertStatus(ctx, alertStatus)

chckpoint := params.GetIngestResult().GetCheckpoint()
chkpjs, err := chckpoint.ToJSONorDefault(json.RawMessage(`{}`))
if err != nil {
logger.Err(err).Msg("error marshalling checkpoint")
}

// Log result in the evaluation history tables
err = e.querier.WithTransactionErr(func(qtx db.ExtendQuerier) error {
evalID, err := e.historyService.StoreEvaluationStatus(
Expand All @@ -152,6 +159,7 @@ func (e *executor) createOrUpdateEvalStatus(
params.EntityType,
entityID,
params.GetEvalErr(),
chkpjs,
)
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion internal/engine/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ func TestExecutor_handleEntityEvent(t *testing.T) {
evaluationID := uuid.New()
historyService := mockhistory.NewMockEvaluationHistoryService(ctrl)
historyService.EXPECT().
StoreEvaluationStatus(gomock.Any(), gomock.Any(), ruleInstanceID, profileID, db.EntitiesRepository, repositoryID, gomock.Any()).
StoreEvaluationStatus(
gomock.Any(), gomock.Any(), ruleInstanceID, profileID, db.EntitiesRepository, repositoryID, gomock.Any(), gomock.Any()).
Return(evaluationID, nil)

mockStore.EXPECT().
Expand Down
9 changes: 9 additions & 0 deletions internal/engine/ingester/artifact/artifact.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

evalerrors "github.com/stacklok/minder/internal/engine/errors"
engif "github.com/stacklok/minder/internal/engine/interfaces"
"github.com/stacklok/minder/internal/entities/checkpoints"
artif "github.com/stacklok/minder/internal/providers/artifact"
"github.com/stacklok/minder/internal/verifier"
"github.com/stacklok/minder/internal/verifier/sigstore/container"
Expand Down Expand Up @@ -115,6 +116,14 @@ func (i *Ingest) Ingest(

return &engif.Result{
Object: applicable,
// We would ideally return an artifact's digest here, but
// the current state of the artifact ingester is actually evaluating
// multiple artifacts at the same time. This is not ideal, ideally
// we should evaluate one impulse at a time. This has to be fixed,
// but for now we return the current time as the checkpoint.
// We need to track the "impulse" that triggered the evaluation
// so we can return the correct checkpoint.
Checkpoint: checkpoints.NewCheckpointV1Now(),
}, nil
}

Expand Down
4 changes: 3 additions & 1 deletion internal/engine/ingester/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

evalerrors "github.com/stacklok/minder/internal/engine/errors"
engif "github.com/stacklok/minder/internal/engine/interfaces"
"github.com/stacklok/minder/internal/entities/checkpoints"
"github.com/stacklok/minder/internal/util"
pb "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1"
"github.com/stacklok/minder/pkg/rule_methods"
Expand Down Expand Up @@ -110,7 +111,8 @@ func (idi *BuiltinRuleDataIngest) Ingest(ctx context.Context, ent protoreflect.P
}

return &engif.Result{
Object: resultObj,
Object: resultObj,
Checkpoint: checkpoints.NewCheckpointV1Now(),
}, nil
}

Expand Down
6 changes: 6 additions & 0 deletions internal/engine/ingester/diff/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ import (
"regexp"
"strconv"
"strings"
"time"

"github.com/rs/zerolog"
"google.golang.org/protobuf/reflect/protoreflect"

engif "github.com/stacklok/minder/internal/engine/interfaces"
"github.com/stacklok/minder/internal/entities/checkpoints"
pbinternal "github.com/stacklok/minder/internal/proto"
pb "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1"
provifv1 "github.com/stacklok/minder/pkg/providers/v1"
Expand Down Expand Up @@ -122,6 +124,8 @@ func (di *Diff) Ingest(
Pr: pr,
Deps: allDiffs,
},
// NOTE: At this point we're only retrieving the timestamp as the checkpoint.
Checkpoint: checkpoints.NewCheckpointV1(time.Now()),
}, nil

case pb.DiffTypeFull:
Expand Down Expand Up @@ -152,6 +156,8 @@ func (di *Diff) Ingest(
Pr: pr,
Files: allDiffs,
},
// NOTE: At this point we're only retrieving the timestamp as the checkpoint.
Checkpoint: checkpoints.NewCheckpointV1Now(),
}, nil

default:
Expand Down
19 changes: 16 additions & 3 deletions internal/engine/ingester/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

engerrors "github.com/stacklok/minder/internal/engine/errors"
engif "github.com/stacklok/minder/internal/engine/interfaces"
"github.com/stacklok/minder/internal/entities/checkpoints"
pb "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1"
provifv1 "github.com/stacklok/minder/pkg/providers/v1"
)
Expand Down Expand Up @@ -102,10 +103,22 @@ func (gi *Git) Ingest(ctx context.Context, ent protoreflect.ProtoMessage, params
return nil, fmt.Errorf("could not get worktree: %w", err)
}

head, err := r.Head()
if err != nil {
return nil, fmt.Errorf("could not get head: %w", err)
}

hsh := head.Hash()

chkpoint := checkpoints.NewCheckpointV1Now().
WithBranch(branch).
WithCommitHash(hsh.String())

return &engif.Result{
Object: nil,
Fs: wt.Filesystem,
Storer: r.Storer,
Object: nil,
Fs: wt.Filesystem,
Storer: r.Storer,
Checkpoint: chkpoint,
}, nil
}

Expand Down
6 changes: 6 additions & 0 deletions internal/engine/ingester/git/git_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func TestGitIngestWithCloneURLFromRepo(t *testing.T) {
require.NoError(t, err, "expected no error")
require.NotNil(t, got, "expected non-nil result")
require.NotNil(t, got.Fs, "expected non-nil fs")
require.NotNil(t, got.Checkpoint, "expected non-nil checkpoint")

fs := got.Fs
f, err := fs.Open("README")
Expand All @@ -65,6 +66,11 @@ func TestGitIngestWithCloneURLFromRepo(t *testing.T) {
require.NoError(t, err, "expected no error")

require.Contains(t, buf.String(), "Hello World", "expected README.md to contain Hello World")

require.NotNil(t, got.Checkpoint.Checkpoint.Branch, "expected non-nil branch")
require.Equal(t, "master", *got.Checkpoint.Checkpoint.Branch, "expected branch to be master")

require.NotNil(t, got.Checkpoint.Checkpoint.CommitHash, "expected non-nil commit")
}

func TestGitIngestWithCloneURLFromParams(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion internal/engine/ingester/rest/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"google.golang.org/protobuf/reflect/protoreflect"

engif "github.com/stacklok/minder/internal/engine/interfaces"
"github.com/stacklok/minder/internal/entities/checkpoints"
"github.com/stacklok/minder/internal/util"
pb "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1"
provifv1 "github.com/stacklok/minder/pkg/providers/v1"
Expand Down Expand Up @@ -154,7 +155,8 @@ func (rdi *Ingestor) Ingest(ctx context.Context, ent protoreflect.ProtoMessage,
}

return &engif.Result{
Object: data,
Object: data,
Checkpoint: checkpoints.NewCheckpointV1Now(),
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/engine/ingester/rest/rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func TestRestIngest(t *testing.T) {
}

require.NoError(t, err, "unexpected error creating remediate engine")
require.Equal(t, tt.ingResultFn(), result, "unexpected result")
require.Equal(t, tt.ingResultFn().Object, result.Object, "unexpected result")
})
}
}
Expand Down
14 changes: 14 additions & 0 deletions internal/engine/interfaces/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/stacklok/minder/internal/db"
evalerrors "github.com/stacklok/minder/internal/engine/errors"
"github.com/stacklok/minder/internal/entities/checkpoints"
"github.com/stacklok/minder/internal/profiles/models"
)

Expand Down Expand Up @@ -60,6 +61,19 @@ type Result struct {
// FIXME: It might be cleaner to either wrap both Fs and Storer in a struct
// or pass out the git.Repository structure instead of the storer.
Storer storage.Storer

// Checkpoint is the checkpoint at which the ingestion was done. This is
// used to persist the state of the entity at ingestion time.
Checkpoint *checkpoints.CheckpointEnvelopeV1
}

// GetCheckpoint returns the checkpoint of the result
func (r *Result) GetCheckpoint() *checkpoints.CheckpointEnvelopeV1 {
if r == nil {
return nil
}

return r.Checkpoint
}

// ActionType represents the type of action, i.e., remediate, alert, etc.
Expand Down
29 changes: 28 additions & 1 deletion internal/entities/checkpoints/checkpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
// Package checkpoints contains logic relating to checkpoint management for entities
package checkpoints

import "time"
import (
"encoding/json"
"time"
)

// V1 is the version string for the v1 format.
const V1 = "v1"
Expand Down Expand Up @@ -47,6 +50,11 @@ type CheckpointV1 struct {
Digest *string `json:"digest,omitempty" yaml:"digest,omitempty"`
}

// NewCheckpointV1Now creates a new CheckpointV1 with the current time.
func NewCheckpointV1Now() *CheckpointEnvelopeV1 {
return NewCheckpointV1(time.Now())
}

// NewCheckpointV1 creates a new CheckpointV1 with the given timestamp.
func NewCheckpointV1(timestamp time.Time) *CheckpointEnvelopeV1 {
return &CheckpointEnvelopeV1{
Expand Down Expand Up @@ -80,3 +88,22 @@ func (c *CheckpointEnvelopeV1) WithDigest(digest string) *CheckpointEnvelopeV1 {
c.Checkpoint.Digest = &digest
return c
}

// ToJSON marshals the checkpoint to JSON.
func (c *CheckpointEnvelopeV1) ToJSON() (json.RawMessage, error) {
return json.Marshal(c)
}

// ToJSONorDefault marshals the checkpoint to JSON or returns a default value.
func (c *CheckpointEnvelopeV1) ToJSONorDefault(def json.RawMessage) (json.RawMessage, error) {
if c == nil {
return def, nil
}

js, err := c.ToJSON()
if err != nil {
return def, err
}

return js, nil
}
2 changes: 1 addition & 1 deletion internal/entities/checkpoints/checkpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestCheckpointEnvelopeV1_MarshalJSON(t *testing.T) {
t.Parallel()

// Marshal the input to JSON
output, err := json.Marshal(tt.input)
output, err := tt.input.ToJSON()
require.NoError(t, err)

assert.Equal(t, string(output), tt.expected)
Expand Down
8 changes: 4 additions & 4 deletions internal/history/mock/service.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion internal/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type EvaluationHistoryService interface {
entityType db.Entities,
entityID uuid.UUID,
evalError error,
marshaledCheckpoint []byte,
) (uuid.UUID, error)
// ListEvaluationHistory returns a list of evaluations stored
// in the history table.
Expand Down Expand Up @@ -69,6 +70,7 @@ func (e *evaluationHistoryService) StoreEvaluationStatus(
entityType db.Entities,
entityID uuid.UUID,
evalError error,
marshaledCheckpoint []byte,
) (uuid.UUID, error) {
var ruleEntityID uuid.UUID
status := evalerrors.ErrorAsEvalStatus(evalError)
Expand Down Expand Up @@ -110,7 +112,7 @@ func (e *evaluationHistoryService) StoreEvaluationStatus(
ruleEntityID = latestRecord.RuleEntityID
}

evaluationID, err := e.createNewStatus(ctx, qtx, ruleEntityID, profileID, status, details)
evaluationID, err := e.createNewStatus(ctx, qtx, ruleEntityID, profileID, status, details, marshaledCheckpoint)
if err != nil {
return uuid.Nil, fmt.Errorf("error while creating new evaluation status for rule/entity %s: %w", ruleEntityID, err)
}
Expand All @@ -125,12 +127,14 @@ func (_ *evaluationHistoryService) createNewStatus(
profileID uuid.UUID,
status db.EvalStatusTypes,
details string,
marshaledCheckpoint []byte,
) (uuid.UUID, error) {
newEvaluationID, err := qtx.InsertEvaluationStatus(ctx,
db.InsertEvaluationStatusParams{
RuleEntityID: ruleEntityID,
Status: status,
Details: details,
Checkpoint: marshaledCheckpoint,
},
)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion internal/history/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ func TestStoreEvaluationStatus(t *testing.T) {
}

service := NewEvaluationHistoryService()
id, err := service.StoreEvaluationStatus(ctx, store, ruleID, profileID, scenario.EntityType, entityID, errTest)
id, err := service.StoreEvaluationStatus(
ctx, store, ruleID, profileID, scenario.EntityType, entityID, errTest, []byte("{}"))
if scenario.ExpectedError == "" {
require.Equal(t, evaluationID, id)
require.NoError(t, err)
Expand Down