Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: global filter #376

Merged
merged 15 commits into from
Nov 26, 2021
Merged
108 changes: 61 additions & 47 deletions pkg/object/globalfilter/globalfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package globalfilter

import (
"fmt"
"sync"
"sync/atomic"

"github.com/megaease/easegress/pkg/protocol"

"github.com/megaease/easegress/pkg/context"
"github.com/megaease/easegress/pkg/object/httppipeline"
Expand All @@ -33,9 +35,6 @@ const (

// Kind is the kind of GlobalFilter.
Kind = "GlobalFilter"

beforePipelineKey = "before"
afterPipelineKey = "after"
)

type (
Expand All @@ -46,9 +45,8 @@ type (
superSpec *supervisor.Spec
spec *Spec

// pipelines map[string]*httppipeline.HTTPPipeline
// only contains two key: before and after
pipelines *sync.Map
beforePipeline atomic.Value
afterPipeline atomic.Value
}

// Spec describes the GlobalFilter.
Expand All @@ -73,29 +71,45 @@ func init() {
func (gf *GlobalFilter) CreateAndUpdateBeforePipelineForSpec(spec *Spec, previousGeneration *httppipeline.HTTPPipeline) error {
beforePipeline := &pipelineSpec{
Kind: httppipeline.Kind,
Name: beforePipelineKey,
Name: "before",
Spec: spec.BeforePipeline,
}
return gf.CreateAndUpdatePipeline(beforePipeline, previousGeneration)
pipeline, err := gf.CreateAndUpdatePipeline(beforePipeline, previousGeneration)
if err != nil {
return err
}
if pipeline == nil {
return fmt.Errorf("before pipeline is nil, spec: %v", beforePipeline)
}
gf.beforePipeline.Store(pipeline)
return nil
}

// CreateAndUpdateAfterPipelineForSpec ...
func (gf *GlobalFilter) CreateAndUpdateAfterPipelineForSpec(spec *Spec, previousGeneration *httppipeline.HTTPPipeline) error {
afterPipeline := &pipelineSpec{
Kind: httppipeline.Kind,
Name: beforePipelineKey,
Name: "after",
Spec: spec.AfterPipeline,
}
return gf.CreateAndUpdatePipeline(afterPipeline, previousGeneration)
pipeline, err := gf.CreateAndUpdatePipeline(afterPipeline, previousGeneration)
if err != nil {
return err
}
if pipeline == nil {
return fmt.Errorf("after pipeline is nil, spec: %v", afterPipeline)
}
gf.afterPipeline.Store(pipeline)
return nil
}

// CreateAndUpdatePipeline create and update globalFilter`s pipelines from pipeline spec
func (gf *GlobalFilter) CreateAndUpdatePipeline(spec *pipelineSpec, previousGeneration *httppipeline.HTTPPipeline) error {
func (gf *GlobalFilter) CreateAndUpdatePipeline(spec *pipelineSpec, previousGeneration *httppipeline.HTTPPipeline) (*httppipeline.HTTPPipeline, error) {
// init config
config := yamltool.Marshal(spec)
specS, err := supervisor.NewSpec(string(config))
if err != nil {
return err
return nil, err
}

// init or update pipeline
Expand All @@ -105,8 +119,7 @@ func (gf *GlobalFilter) CreateAndUpdatePipeline(spec *pipelineSpec, previousGene
} else {
pipeline.Init(specS, nil)
}
gf.pipelines.Store(spec.Name, pipeline)
return nil
return pipeline, nil
}

// Category returns the object category of itself.
Expand Down Expand Up @@ -135,7 +148,6 @@ func (gf *GlobalFilter) Status() *supervisor.Status {
// Init initializes GlobalFilter.
func (gf *GlobalFilter) Init(superSpec *supervisor.Spec) {
gf.superSpec, gf.spec = superSpec, superSpec.ObjectSpec().(*Spec)
gf.pipelines = &sync.Map{}
gf.reload(nil)
}

Expand All @@ -145,44 +157,49 @@ func (gf *GlobalFilter) Inherit(superSpec *supervisor.Spec, previousGeneration s
gf.reload(previousGeneration.(*GlobalFilter))
}

// BeforeHandle before handler logic for beforePipeline spec
func (gf *GlobalFilter) BeforeHandle(ctx context.HTTPContext) {
handler, ok := gf.getPipeline(beforePipelineKey)
if !ok {
func (gf *GlobalFilter) Handle(ctx context.HTTPContext, httpHandle protocol.HTTPHandler) {
result := gf.beforeHandle(ctx)
if result == httppipeline.LabelEND {
return
}
result = httpHandle.Handle(ctx)
if result == httppipeline.LabelEND {
return
}
handler.Handle(ctx)
gf.afterHandle(ctx)
return
}

// AfterHandle after handler logic for afterPipeline spec
func (gf *GlobalFilter) AfterHandle(ctx context.HTTPContext) {
handler, ok := gf.getPipeline(afterPipelineKey)
// BeforeHandle before handler logic for beforePipeline spec
func (gf *GlobalFilter) beforeHandle(ctx context.HTTPContext) string {
value := gf.beforePipeline.Load()
if value == nil {
return ""
}
handler, ok := value.(*httppipeline.HTTPPipeline)
if !ok {
return
return ""
}
handler.Handle(ctx)
return handler.Handle(ctx)
}

func (gf *GlobalFilter) getPipeline(key string) (*httppipeline.HTTPPipeline, bool) {
value, ok := gf.pipelines.Load(key)
if !ok || value == nil {
return nil, false
// AfterHandle after handler logic for afterPipeline spec
func (gf *GlobalFilter) afterHandle(ctx context.HTTPContext) string {
value := gf.afterPipeline.Load()
if value == nil {
return ""
}
handler, ok := value.(*httppipeline.HTTPPipeline)
if !ok {
return ""
}
pipe, ok := value.(*httppipeline.HTTPPipeline)
return pipe, ok
return handler.Handle(ctx)
}

// Close closes itself. It is called by deleting.
// Supervisor won't call Close for previous generation in Update.
func (gf *GlobalFilter) Close() {
gf.pipelines.Range(func(key, value interface{}) bool {
if v, ok := value.(*httppipeline.HTTPPipeline); ok {
v.Close()
}
return true
})
gf.pipelines.Delete(beforePipelineKey)
gf.pipelines.Delete(afterPipelineKey)

}

// Validate validates Spec.
Expand All @@ -202,14 +219,11 @@ func (s *Spec) Validate() (err error) {

func (gf *GlobalFilter) reload(previousGeneration *GlobalFilter) {
var beforePreviousPipeline, afterPreviousPipeline *httppipeline.HTTPPipeline
if previousGeneration != nil {
gf.pipelines = previousGeneration.pipelines
}
// create and update beforePipeline entity
if len(gf.spec.BeforePipeline.Flow) != 0 {
if previousGeneration != nil {
previous, ok := gf.pipelines.Load(beforePipelineKey)
if ok {
previous := previousGeneration.beforePipeline.Load()
if previous != nil {
beforePreviousPipeline = previous.(*httppipeline.HTTPPipeline)
}
}
Expand All @@ -221,8 +235,8 @@ func (gf *GlobalFilter) reload(previousGeneration *GlobalFilter) {
//create and update afterPipeline entity
if len(gf.spec.AfterPipeline.Flow) != 0 {
if previousGeneration != nil {
previous, ok := gf.pipelines.Load(beforePipelineKey)
if ok {
previous := previousGeneration.afterPipeline.Load()
if previous != nil {
afterPreviousPipeline = previous.(*httppipeline.HTTPPipeline)
}
}
Expand Down
26 changes: 16 additions & 10 deletions pkg/object/httppipeline/httppipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,11 @@ func (hp *HTTPPipeline) reload(previousGeneration *HTTPPipeline) {
hp.runningFilters = runningFilters
}

func (hp *HTTPPipeline) getNextFilterIndex(index int, result string) int {
// getNextFilterIndex return index and whether end
func (hp *HTTPPipeline) getNextFilterIndex(index int, result string) (int, bool) {
// return index + 1 if last filter succeeded
if result == "" {
return index + 1
return index + 1, false
}

// check the jumpIf table of current filter, return its index if the jump
Expand All @@ -417,31 +418,32 @@ func (hp *HTTPPipeline) getNextFilterIndex(index int, result string) int {
}

if len(filter.jumpIf) == 0 {
return -1
return -1, false
}
name, ok := filter.jumpIf[result]
if !ok {
return -1
return -1, false
}
if name == LabelEND {
return len(hp.runningFilters)
return len(hp.runningFilters), true
}

for index++; index < len(hp.runningFilters); index++ {
if hp.runningFilters[index].spec.Name() == name {
return index
return index, false
}
}

return -1
return -1, false
}

// Handle is the handler to deal with HTTP
func (hp *HTTPPipeline) Handle(ctx context.HTTPContext) {
func (hp *HTTPPipeline) Handle(ctx context.HTTPContext) string {
ctx.SetTemplate(hp.ht)

filterIndex := -1
filterStat := newFilterStat()
isEnd := false

handle := func(lastResult string) string {
// For saving the `filterIndex`'s filter generated HTTP Response.
Expand All @@ -467,7 +469,10 @@ func (hp *HTTPPipeline) Handle(ctx context.HTTPContext) {
filterStat = lastStat
}()

filterIndex = hp.getNextFilterIndex(filterIndex, lastResult)
filterIndex, isEnd = hp.getNextFilterIndex(filterIndex, lastResult)
if isEnd {
return LabelEND // jumpIf end of pipeline
}
if filterIndex == len(hp.runningFilters) {
return "" // reach the end of pipeline
} else if filterIndex == -1 {
Expand Down Expand Up @@ -500,9 +505,10 @@ func (hp *HTTPPipeline) Handle(ctx context.HTTPContext) {
}

ctx.SetHandlerCaller(handle)
handle("")
result := handle("")

ctx.AddTag(filterStat.marshalAndRelease())
return result
}

func (hp *HTTPPipeline) getRunningFilter(name string) *runningFilter {
Expand Down
13 changes: 7 additions & 6 deletions pkg/object/httppipeline/httppipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
package httppipeline

import (
"reflect"
"testing"

"github.com/megaease/easegress/pkg/context"
"github.com/megaease/easegress/pkg/context/contexttest"
"github.com/megaease/easegress/pkg/logger"
"github.com/megaease/easegress/pkg/supervisor"
"reflect"
"testing"
)

func CreateObjectMock(kind string) Filter {
Expand Down Expand Up @@ -275,16 +276,16 @@ filters:
httpPipeline.Inherit(superSpec, &httpPipeline, nil)

t.Run("test getNextFilterIndex", func(t *testing.T) {
if ind := httpPipeline.getNextFilterIndex(0, ""); ind != 1 {
if ind, end := httpPipeline.getNextFilterIndex(0, ""); ind != 1 && end != false {
t.Errorf("next index should be 1, was %d", ind)
}
if ind := httpPipeline.getNextFilterIndex(0, "invalid"); ind != 3 {
if ind, end := httpPipeline.getNextFilterIndex(0, "invalid"); ind != 3 && end != true {
t.Errorf("next index should be 3, was %d", ind)
}
if ind := httpPipeline.getNextFilterIndex(0, "unknown"); ind != -1 {
if ind, end := httpPipeline.getNextFilterIndex(0, "unknown"); ind != -1 && end != false {
t.Errorf("next index should be -1, was %d", ind)
}
if ind := httpPipeline.getNextFilterIndex(1, "specialCase"); ind != 2 {
if ind, end := httpPipeline.getNextFilterIndex(1, "specialCase"); ind != 2 && end != false {
t.Errorf("next index should be 2, was %d", ind)
}
})
Expand Down
28 changes: 8 additions & 20 deletions pkg/object/httpserver/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,9 +456,13 @@ func (m *mux) handleRequestWithCache(rules *muxRules, ctx context.HTTPContext, c
path = ci.path.pathRE.ReplaceAllString(path, ci.path.rewriteTarget)
ctx.Request().SetPath(path)
}
m.beforeGlobalHTTPFilterHandle(ctx, rules)
handler.Handle(ctx)
m.afterGlobalHTTPFilterHandle(ctx, rules)
// global filter
globalFilter := m.getGlobalFilter(rules)
if globalFilter == nil {
handler.Handle(ctx)
return
}
globalFilter.Handle(ctx, handler)
}
}

Expand All @@ -477,23 +481,7 @@ func (m *mux) appendXForwardedFor(ctx context.HTTPContext) {
}
}

func (m *mux) beforeGlobalHTTPFilterHandle(ctx context.HTTPContext, rules *muxRules) {
globalFilters := m.getGlobalFilters(rules)
if globalFilters == nil {
return
}
globalFilters.BeforeHandle(ctx)
}

func (m *mux) afterGlobalHTTPFilterHandle(ctx context.HTTPContext, rules *muxRules) {
globalFilters := m.getGlobalFilters(rules)
if globalFilters == nil {
return
}
globalFilters.AfterHandle(ctx)
}

func (m *mux) getGlobalFilters(rules *muxRules) *globalfilter.GlobalFilter {
func (m *mux) getGlobalFilter(rules *muxRules) *globalfilter.GlobalFilter {
if rules.spec.GlobalFilter == "" {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/protocol/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type (
// HTTPHandler is the common handler for the all backends
// which handle the traffic from HTTPServer.
HTTPHandler interface {
Handle(ctx context.HTTPContext)
Handle(ctx context.HTTPContext) string
}

// MuxMapper gets HTTP handler pipeline with mutex
Expand Down