Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: global filter #376

Merged
merged 15 commits into from
Nov 26, 2021
Merged
217 changes: 217 additions & 0 deletions pkg/object/globalfilter/globalfilter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
package globalfilter

import (
"fmt"
"sync"

"github.com/megaease/easegress/pkg/context"
"github.com/megaease/easegress/pkg/object/httppipeline"
"github.com/megaease/easegress/pkg/supervisor"
"github.com/megaease/easegress/pkg/util/yamltool"
)

const (
// Category is the category of GlobalFilter.
Category = supervisor.CategoryBusinessController

// Kind is the kind of GlobalFilter.
Kind = "GlobalFilter"

beforePipelineKey = "before"
afterPipelineKey = "after"
)

type (
// GlobalFilter is a business controller
// provide handler before and after pipeline in HTTPServer
GlobalFilter struct {
super *supervisor.Supervisor
superSpec *supervisor.Spec
spec *Spec

// pipelines map[string]*httppipeline.HTTPPipeline
// only contains two key: before and after
pipelines *sync.Map
}

// Spec describes the GlobalFilter.
Spec struct {
BeforePipeline httppipeline.Spec `yaml:"beforePipeline" jsonschema:"omitempty"`
AfterPipeline httppipeline.Spec `yaml:"afterPipeline" jsonschema:"omitempty"`
}

// pipelineSpec define httppipeline spec to create a httppipeline entity
pipelineSpec struct {
Kind string `yaml:"kind" jsonschema:"omitempty"`
Name string `yaml:"name" jsonschema:"omitempty"`
httppipeline.Spec `yaml:",inline"`
}
)

func init() {
supervisor.Register(&GlobalFilter{})
}

// CreateAndUpdateBeforePipelineForSpec ...
func (gf *GlobalFilter) CreateAndUpdateBeforePipelineForSpec(spec *Spec, previousGeneration *httppipeline.HTTPPipeline) error {
beforePipeline := &pipelineSpec{
Kind: httppipeline.Kind,
Name: beforePipelineKey,
Spec: spec.BeforePipeline,
}
return gf.CreateAndUpdatePipeline(beforePipeline, previousGeneration)
}

// CreateAndUpdateAfterPipelineForSpec ...
func (gf *GlobalFilter) CreateAndUpdateAfterPipelineForSpec(spec *Spec, previousGeneration *httppipeline.HTTPPipeline) error {
afterPipeline := &pipelineSpec{
Kind: httppipeline.Kind,
Name: beforePipelineKey,
Spec: spec.AfterPipeline,
}
return gf.CreateAndUpdatePipeline(afterPipeline, previousGeneration)
}

// CreateAndUpdatePipeline create and update globalFilter`s pipelines from pipeline spec
func (gf *GlobalFilter) CreateAndUpdatePipeline(spec *pipelineSpec, previousGeneration *httppipeline.HTTPPipeline) error {
// init config
config := yamltool.Marshal(spec)
specS, err := supervisor.NewSpec(string(config))
if err != nil {
return err
}

// init or update pipeline
var pipeline = new(httppipeline.HTTPPipeline)
if previousGeneration != nil {
pipeline.Inherit(specS, previousGeneration, nil)
} else {
pipeline.Init(specS, nil)
}
gf.pipelines.Store(spec.Name, pipeline)
return nil
}

// Category returns the object category of itself.
func (gf *GlobalFilter) Category() supervisor.ObjectCategory {
return Category
}

// Kind returns the unique kind name to represent itself.
func (gf *GlobalFilter) Kind() string {
return Kind
}

// DefaultSpec returns the default spec.
// It must return a pointer to point a struct.
func (gf *GlobalFilter) DefaultSpec() interface{} {
return &Spec{}
}

// Status returns its runtime status.
func (gf *GlobalFilter) Status() *supervisor.Status {
return &supervisor.Status{
ObjectStatus: struct{}{},
}
}

// Init initializes GlobalFilter.
func (gf *GlobalFilter) Init(superSpec *supervisor.Spec) {
gf.superSpec, gf.spec = superSpec, superSpec.ObjectSpec().(*Spec)
gf.pipelines = &sync.Map{}
gf.reload(nil)
}

// Inherit inherits previous generation of GlobalFilter.
func (gf *GlobalFilter) Inherit(superSpec *supervisor.Spec, previousGeneration supervisor.Object) {
gf.superSpec, gf.spec = superSpec, superSpec.ObjectSpec().(*Spec)
gf.reload(previousGeneration.(*GlobalFilter))
}

// BeforeHandle before handler logic for beforePipeline spec
func (gf *GlobalFilter) BeforeHandle(ctx context.HTTPContext) {
handler, ok := gf.getPipeline(beforePipelineKey)
if !ok {
return
}
handler.Handle(ctx)
}

// AfterHandle after handler logic for afterPipeline spec
func (gf *GlobalFilter) AfterHandle(ctx context.HTTPContext) {
handler, ok := gf.getPipeline(afterPipelineKey)
if !ok {
return
}
handler.Handle(ctx)
}

func (gf *GlobalFilter) getPipeline(key string) (*httppipeline.HTTPPipeline, bool) {
value, ok := gf.pipelines.Load(key)
if !ok || value == nil {
return nil, false
}
pipe, ok := value.(*httppipeline.HTTPPipeline)
return pipe, ok
}

// Close closes itself. It is called by deleting.
// Supervisor won't call Close for previous generation in Update.
func (gf *GlobalFilter) Close() {
gf.pipelines.Range(func(key, value interface{}) bool {
if v, ok := value.(*httppipeline.HTTPPipeline); ok {
v.Close()
}
return true
})
gf.pipelines.Delete(beforePipelineKey)
gf.pipelines.Delete(afterPipelineKey)
}

// Validate validates Spec.
func (s *Spec) Validate() (err error) {

err = s.BeforePipeline.Validate()
if err != nil {
return fmt.Errorf("before pipeline is invalidate err: %v", err)
}
err = s.AfterPipeline.Validate()
if err != nil {
return fmt.Errorf("after pipeline is invalidate err: %v", err)
}

return nil
}

func (gf *GlobalFilter) reload(previousGeneration *GlobalFilter) {
var beforePreviousPipeline, afterPreviousPipeline *httppipeline.HTTPPipeline
if previousGeneration != nil {
gf.pipelines = previousGeneration.pipelines
}
// create and update beforePipeline entity
if len(gf.spec.BeforePipeline.Flow) != 0 {
if previousGeneration != nil {
previous, ok := gf.pipelines.Load(beforePipelineKey)
if ok {
beforePreviousPipeline = previous.(*httppipeline.HTTPPipeline)
}
}
err := gf.CreateAndUpdateBeforePipelineForSpec(gf.spec, beforePreviousPipeline)
if err != nil {
panic(fmt.Sprintf("create before pipeline error %v", err))
}
}
//create and update afterPipeline entity
if len(gf.spec.AfterPipeline.Flow) != 0 {
if previousGeneration != nil {
previous, ok := gf.pipelines.Load(beforePipelineKey)
if ok {
afterPreviousPipeline = previous.(*httppipeline.HTTPPipeline)
}
}
err := gf.CreateAndUpdateAfterPipelineForSpec(gf.spec, afterPreviousPipeline)
if err != nil {
panic(fmt.Sprintf("create after pipeline error %v", err))
}
}
}
31 changes: 31 additions & 0 deletions pkg/object/httpserver/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"strings"
"sync/atomic"

"github.com/megaease/easegress/pkg/object/globalfilter"

"github.com/megaease/easegress/pkg/context"
"github.com/megaease/easegress/pkg/logger"
"github.com/megaease/easegress/pkg/protocol"
Expand Down Expand Up @@ -454,7 +456,9 @@ func (m *mux) handleRequestWithCache(rules *muxRules, ctx context.HTTPContext, c
path = ci.path.pathRE.ReplaceAllString(path, ci.path.rewriteTarget)
ctx.Request().SetPath(path)
}
m.beforeGlobalHTTPFilterHandle(ctx, rules)
handler.Handle(ctx)
m.afterGlobalHTTPFilterHandle(ctx, rules)
}
}

Expand All @@ -473,6 +477,33 @@ func (m *mux) appendXForwardedFor(ctx context.HTTPContext) {
}
}

func (m *mux) beforeGlobalHTTPFilterHandle(ctx context.HTTPContext, rules *muxRules) {
globalFilters := m.getGlobalFilters(rules)
if globalFilters == nil {
return
}
globalFilters.BeforeHandle(ctx)
}

func (m *mux) afterGlobalHTTPFilterHandle(ctx context.HTTPContext, rules *muxRules) {
globalFilters := m.getGlobalFilters(rules)
if globalFilters == nil {
return
}
globalFilters.AfterHandle(ctx)
}

func (m *mux) getGlobalFilters(rules *muxRules) *globalfilter.GlobalFilter {
if rules.spec.GlobalFilter == "" {
return nil
}
globalFilter, ok := rules.superSpec.Super().GetBusinessController(rules.spec.GlobalFilter)
if globalFilter == nil || !ok {
return nil
}
return globalFilter.Instance().(*globalfilter.GlobalFilter)
}

func (m *mux) close() {
rules := m.rules.Load().(*muxRules)
err := rules.tracer.Close()
Expand Down
2 changes: 2 additions & 0 deletions pkg/object/httpserver/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type (

IPFilter *ipfilter.Spec `yaml:"ipFilter,omitempty" jsonschema:"omitempty"`
Rules []*Rule `yaml:"rules" jsonschema:"omitempty"`

GlobalFilter string `yaml:"globalFilter,omitempty" jsonschema:"omitempty"`
}

// Rule is first level entry of router.
Expand Down
1 change: 1 addition & 0 deletions pkg/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
_ "github.com/megaease/easegress/pkg/object/etcdserviceregistry"
_ "github.com/megaease/easegress/pkg/object/eurekaserviceregistry"
_ "github.com/megaease/easegress/pkg/object/function"
_ "github.com/megaease/easegress/pkg/object/globalfilter"
_ "github.com/megaease/easegress/pkg/object/httppipeline"
_ "github.com/megaease/easegress/pkg/object/httpserver"
_ "github.com/megaease/easegress/pkg/object/ingresscontroller"
Expand Down