Skip to content

RFC: controller2 #1263

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 175 additions & 0 deletions indexer/controller/v2/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package controller

import (
"context"
"errors"
"fmt"
"io"
"reflect"
"runtime"
"strings"
"time"

"github.com/quay/zlog"

"github.com/quay/claircore"
"github.com/quay/claircore/indexer"
)

type Controller struct {
store indexer.Store // NOTE(hank) This should be [datastore.Indexer].
fetcher indexer.FetchArena
}

func New(ctx context.Context,
store indexer.Store,
fetcher indexer.FetchArena,
) (*Controller, error) {
c := Controller{
store: store,
fetcher: fetcher,
}
return &c, nil
}

func (c *Controller) Index(ctx context.Context, m *claircore.Manifest) (*claircore.IndexReport, error) {
e, err := c.newExec(ctx)
if err != nil {
return nil, fmt.Errorf("controller: unable to construct execution context: %w", err)
}
defer func() {
if err := e.Close(); err != nil {
zlog.Info(ctx).
Err(err).
Msg("error closing resources")
}
}()

Run:
for !e.IsTerminal() {
// At the start of every step, check if the request's context is valid.
// If not, everything should be at a safe-point and we can just exit this loop.
select {
case <-ctx.Done():
err = context.Cause(ctx)
// Break directly to avoid messing with the exec struct.
break Run
default:
}

// Run this step.
// The execution should continue as long as the parent context is valid
// or a short interval after the parent context was canceled, whichever
// is longer.
func() {
ctx := zlog.ContextWithValues(ctx, "step", e.State.String())
defer func() {
zlog.Debug(ctx).
Err(err).
Stringer("next", e.State).
Msg("step ran")
}()

// Create & cleanup the step context.
sctx, cause := context.WithCancelCause(context.WithoutCancel(ctx))
stop := context.AfterFunc(ctx, func() { // NB Using the parent context.
time.Sleep(30 * time.Second) // BUG(hank) The per-step grace period is not configurable.
cause(fmt.Errorf("controller: %w: %w", errGracePeriod, context.Cause(ctx)))
})
defer func() {
// This is complicated because of the desired grace period behavior.
usedGrace := !stop()
err := sctx.Err() // Make sure to capture this before the unconditional CancelCause call.
cause(errStepComplete)
zlog.Debug(ctx).
Bool("used_grace_period", usedGrace).
Bool("timed_out", errors.Is(err, errGracePeriod)).
AnErr("cause", err).
Msg("ending step context")
}()

e.State, err = e.State(sctx, e, m)
}()

// All errors out of controller steps should either be of type *stepError,
// or be accompanied by a terminal stateFn.
var serr *stepError
switch {
case errors.Is(err, nil):
case errors.As(err, &serr):
panic("TODO: handle stepErr")
case e.IsTerminal():
// "Err" is not a *stepErr and is was with a terminal stateFn.
continue
default:
panic(fmt.Errorf("programmer error: previous step returned (%v, %v) ", e.State, err))
}

// TODO(hank) Do the database persistence.
}
switch {
case errors.Is(err, nil):
case errors.Is(err, context.Canceled):
// Log?
return nil, fmt.Errorf("controller: ended early: %w", err)
default:
return nil, fmt.Errorf("controller: fatal error: %w", err)
}

return e.Result, nil
}

func (c *Controller) newExec(ctx context.Context) (*exec, error) {
e := exec{
Store: c.store,
Realizer: c.fetcher.Realizer(ctx).(indexer.DescriptionRealizer),

Result: &claircore.IndexReport{
Packages: map[string]*claircore.Package{},
Environments: map[string][]*claircore.Environment{},
Distributions: map[string]*claircore.Distribution{},
Repositories: map[string]*claircore.Repository{},
Files: map[string]claircore.File{},
},
State: checkManifest,
}
return &e, nil
}

type stateFn func(context.Context, *exec, *claircore.Manifest) (stateFn, error)

func (f stateFn) String() (n string) {
if f == nil {
return "<Terminal>"
}
n = runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name()
_, n, _ = strings.Cut(n, "controller.")
return n
}

// Aliases for my sanity
type detector = indexer.VersionedScanner
type store = indexer.Store // Should be [datastore.Indexer]

type exec struct {
Store store
Detectors []detector
Realizer indexer.DescriptionRealizer

Defer []io.Closer
Result *claircore.IndexReport
State stateFn
}

func (e *exec) IsTerminal() bool {
return e.State == nil
}

func (e *exec) Close() error {
errs := make([]error, len(e.Defer)+1)
for i, c := range e.Defer {
errs[i] = c.Close()
}
errs[len(errs)-1] = e.Realizer.Close()
return errors.Join(errs...)
}
74 changes: 74 additions & 0 deletions indexer/controller/v2/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package controller

import (
"errors"
"fmt"
)

// Errors for the [Controller's] internal runloop.
var (
// ErrGracePeriod is a signal that the step's grace period started and expired.
errGracePeriod = errors.New("grace period exceeded")
// ErrStepComplete is used to stop the per-step context.
// This should not escape the step; it showing up outside the runloop means there's some wonky lifetimes.
errStepComplete = errors.New("step complete")
)

type stepError struct {
inner error
durability errorDurability
}

//go:generate go run golang.org/x/tools/cmd/stringer -type errorDurability -linecomment
type errorDurability uint

const (
// ErrKindUnspecified is the default; the error says nothing about its durability.
errKindUnspecified errorDurability = iota // unspecified
// ErrKindBlob indicates there's some feature of the blob that means this step will never return a positive result.
//
// Typically there's something wrong with the blob, like it not actually being the expected kind of data.
errKindBlob // blob
// ErrKindCode indicates there's a bug in the code and retrying after a code change may yield a different result.
errKindCode // code
// ErrKindTransient indicates there was an environmental issue, retrying may yield a different result.
errKindTransient // transient
)

func (e *stepError) Error() string {
if e.durability == errKindUnspecified {
return e.inner.Error()
}
return fmt.Sprintf("%v (durable for: %v)", e.inner, e.durability)
}
func (e *stepError) Unwrap() error {
return e.inner
}

var errPerLayer = errors.New("per-layer error")

type layerError struct {
layer, op string
inner error
}

// Error implements error.
func (e *layerError) Error() string {
return fmt.Sprintf("%s for layer %s: %v", e.op, e.layer, e.inner)
}

func (e *layerError) Unwrap() error {
return e.inner
}

func (e *layerError) Is(tgt error) bool {
return errors.Is(errPerLayer, tgt)
}

func newLayerError(which string, op string, err error) error {
return &layerError{
layer: which,
op: op,
inner: err,
}
}
26 changes: 26 additions & 0 deletions indexer/controller/v2/errordurability_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 35 additions & 0 deletions indexer/controller/v2/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package controller

import (
"sync"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

var (
meter = otel.Meter("github.com/quay/claircore/indexer/controller/v2")
tracer = otel.Tracer("github.com/quay/claircore/indexer/controller/v2")

stepCall metric.Int64Counter
)

var metricInit = sync.OnceValue(func() (err error) {
stepCall, err = meter.Int64Counter("step.count",
metric.WithUnit("{call}"),
metric.WithDescription("tktk"),
)
if err != nil {
return err
}
return nil
})

var (
stepAttrKey = attribute.Key("step")
)

func stepAttr(name string) attribute.KeyValue {
return stepAttrKey.String(name)
}
Loading