diff --git a/pkg/object/globalfilter/globalfilter.go b/pkg/object/globalfilter/globalfilter.go new file mode 100644 index 0000000000..4cbd69dcc0 --- /dev/null +++ b/pkg/object/globalfilter/globalfilter.go @@ -0,0 +1,248 @@ +/* + * Copyright (c) 2017, MegaEase + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package globalfilter + +import ( + "fmt" + "sync/atomic" + + "github.com/megaease/easegress/pkg/protocol" + + "github.com/megaease/easegress/pkg/context" + "github.com/megaease/easegress/pkg/object/httppipeline" + "github.com/megaease/easegress/pkg/supervisor" + "github.com/megaease/easegress/pkg/util/yamltool" +) + +const ( + // Category is the category of GlobalFilter. + Category = supervisor.CategoryBusinessController + + // Kind is the kind of GlobalFilter. + Kind = "GlobalFilter" +) + +type ( + // GlobalFilter is a business controller. + // It provides handler before and after pipeline in HTTPServer. + GlobalFilter struct { + super *supervisor.Supervisor + superSpec *supervisor.Spec + spec *Spec + + beforePipeline atomic.Value + afterPipeline atomic.Value + } + + // Spec describes the GlobalFilter. + Spec struct { + BeforePipeline httppipeline.Spec `yaml:"beforePipeline" jsonschema:"omitempty"` + AfterPipeline httppipeline.Spec `yaml:"afterPipeline" jsonschema:"omitempty"` + } + + // pipelineSpec defines httppipeline spec to create an httppipeline entity. + pipelineSpec struct { + Kind string `yaml:"kind" jsonschema:"omitempty"` + Name string `yaml:"name" jsonschema:"omitempty"` + httppipeline.Spec `yaml:",inline"` + } +) + +func init() { + supervisor.Register(&GlobalFilter{}) +} + +// CreateAndUpdateBeforePipelineForSpec creates beforPipeline if the spec is nil, otherwise it updates by the spec. +func (gf *GlobalFilter) CreateAndUpdateBeforePipelineForSpec(spec *Spec, previousGeneration *httppipeline.HTTPPipeline) error { + beforePipeline := &pipelineSpec{ + Kind: httppipeline.Kind, + Name: "before", + Spec: spec.BeforePipeline, + } + pipeline, err := gf.CreateAndUpdatePipeline(beforePipeline, previousGeneration) + if err != nil { + return err + } + if pipeline == nil { + return fmt.Errorf("before pipeline is nil, spec: %v", beforePipeline) + } + gf.beforePipeline.Store(pipeline) + return nil +} + +// CreateAndUpdateAfterPipelineForSpec creates afterPipeline if the spec is nil, otherwise it updates with the spec. +func (gf *GlobalFilter) CreateAndUpdateAfterPipelineForSpec(spec *Spec, previousGeneration *httppipeline.HTTPPipeline) error { + afterPipeline := &pipelineSpec{ + Kind: httppipeline.Kind, + Name: "after", + Spec: spec.AfterPipeline, + } + pipeline, err := gf.CreateAndUpdatePipeline(afterPipeline, previousGeneration) + if err != nil { + return err + } + if pipeline == nil { + return fmt.Errorf("after pipeline is nil, spec: %v", afterPipeline) + } + gf.afterPipeline.Store(pipeline) + return nil +} + +// CreateAndUpdatePipeline creates and updates GlobalFilter's pipelines. +func (gf *GlobalFilter) CreateAndUpdatePipeline(spec *pipelineSpec, previousGeneration *httppipeline.HTTPPipeline) (*httppipeline.HTTPPipeline, error) { + // init config + config := yamltool.Marshal(spec) + specs, err := supervisor.NewSpec(string(config)) + if err != nil { + return nil, err + } + + // init or update pipeline + var pipeline = new(httppipeline.HTTPPipeline) + if previousGeneration != nil { + pipeline.Inherit(specs, previousGeneration, nil) + } else { + pipeline.Init(specs, nil) + } + return pipeline, nil +} + +// Category returns the object category of itself. +func (gf *GlobalFilter) Category() supervisor.ObjectCategory { + return Category +} + +// Kind returns the unique kind name to represent itself. +func (gf *GlobalFilter) Kind() string { + return Kind +} + +// DefaultSpec returns the default spec. +// It must return a pointer to point a struct. +func (gf *GlobalFilter) DefaultSpec() interface{} { + return &Spec{} +} + +// Status returns its runtime status. +func (gf *GlobalFilter) Status() *supervisor.Status { + return &supervisor.Status{ + ObjectStatus: struct{}{}, + } +} + +// Init initializes GlobalFilter. +func (gf *GlobalFilter) Init(superSpec *supervisor.Spec) { + gf.superSpec, gf.spec = superSpec, superSpec.ObjectSpec().(*Spec) + gf.reload(nil) +} + +// Inherit inherits previous generation of GlobalFilter. +func (gf *GlobalFilter) Inherit(superSpec *supervisor.Spec, previousGeneration supervisor.Object) { + gf.superSpec, gf.spec = superSpec, superSpec.ObjectSpec().(*Spec) + gf.reload(previousGeneration.(*GlobalFilter)) +} + +// Handle `beforePipeline` and `afterPipeline` before and after the httpHandler is executed. +func (gf *GlobalFilter) Handle(ctx context.HTTPContext, httpHandle protocol.HTTPHandler) { + result := gf.beforeHandle(ctx) + if result == httppipeline.LabelEND { + return + } + result = httpHandle.Handle(ctx) + if result == httppipeline.LabelEND { + return + } + gf.afterHandle(ctx) + return +} + +// BeforeHandle before handler logic for beforePipeline spec. +func (gf *GlobalFilter) beforeHandle(ctx context.HTTPContext) string { + value := gf.beforePipeline.Load() + if value == nil { + return "" + } + handler, ok := value.(*httppipeline.HTTPPipeline) + if !ok { + return "" + } + return handler.Handle(ctx) +} + +// AfterHandle after handler logic for afterPipeline spec. +func (gf *GlobalFilter) afterHandle(ctx context.HTTPContext) string { + value := gf.afterPipeline.Load() + if value == nil { + return "" + } + handler, ok := value.(*httppipeline.HTTPPipeline) + if !ok { + return "" + } + return handler.Handle(ctx) +} + +// Close closes GlobalFilter itself. +func (gf *GlobalFilter) Close() { + +} + +// Validate validates Spec. +func (s *Spec) Validate() (err error) { + + err = s.BeforePipeline.Validate() + if err != nil { + return fmt.Errorf("before pipeline is invalid: %v", err) + } + err = s.AfterPipeline.Validate() + if err != nil { + return fmt.Errorf("after pipeline is invalid: %v", err) + } + + return nil +} + +func (gf *GlobalFilter) reload(previousGeneration *GlobalFilter) { + var beforePreviousPipeline, afterPreviousPipeline *httppipeline.HTTPPipeline + // create and update beforePipeline entity + if len(gf.spec.BeforePipeline.Flow) != 0 { + if previousGeneration != nil { + previous := previousGeneration.beforePipeline.Load() + if previous != nil { + beforePreviousPipeline = previous.(*httppipeline.HTTPPipeline) + } + } + err := gf.CreateAndUpdateBeforePipelineForSpec(gf.spec, beforePreviousPipeline) + if err != nil { + panic(fmt.Errorf("create before pipeline failed: %v", err)) + } + } + //create and update afterPipeline entity + if len(gf.spec.AfterPipeline.Flow) != 0 { + if previousGeneration != nil { + previous := previousGeneration.afterPipeline.Load() + if previous != nil { + afterPreviousPipeline = previous.(*httppipeline.HTTPPipeline) + } + } + err := gf.CreateAndUpdateAfterPipelineForSpec(gf.spec, afterPreviousPipeline) + if err != nil { + panic(fmt.Errorf("create after pipeline failed: %v", err)) + } + } +} diff --git a/pkg/object/httppipeline/httppipeline.go b/pkg/object/httppipeline/httppipeline.go index 5a750bf1df..a01dd5be88 100644 --- a/pkg/object/httppipeline/httppipeline.go +++ b/pkg/object/httppipeline/httppipeline.go @@ -402,10 +402,11 @@ func (hp *HTTPPipeline) reload(previousGeneration *HTTPPipeline) { hp.runningFilters = runningFilters } -func (hp *HTTPPipeline) getNextFilterIndex(index int, result string) int { +// getNextFilterIndex return filter index and whether jumped to the end of the pipeline. +func (hp *HTTPPipeline) getNextFilterIndex(index int, result string) (int, bool) { // return index + 1 if last filter succeeded if result == "" { - return index + 1 + return index + 1, false } // check the jumpIf table of current filter, return its index if the jump @@ -417,31 +418,32 @@ func (hp *HTTPPipeline) getNextFilterIndex(index int, result string) int { } if len(filter.jumpIf) == 0 { - return -1 + return -1, false } name, ok := filter.jumpIf[result] if !ok { - return -1 + return -1, false } if name == LabelEND { - return len(hp.runningFilters) + return len(hp.runningFilters), true } for index++; index < len(hp.runningFilters); index++ { if hp.runningFilters[index].spec.Name() == name { - return index + return index, false } } - return -1 + return -1, false } // Handle is the handler to deal with HTTP -func (hp *HTTPPipeline) Handle(ctx context.HTTPContext) { +func (hp *HTTPPipeline) Handle(ctx context.HTTPContext) string { ctx.SetTemplate(hp.ht) filterIndex := -1 filterStat := newFilterStat() + isEnd := false handle := func(lastResult string) string { // For saving the `filterIndex`'s filter generated HTTP Response. @@ -467,7 +469,10 @@ func (hp *HTTPPipeline) Handle(ctx context.HTTPContext) { filterStat = lastStat }() - filterIndex = hp.getNextFilterIndex(filterIndex, lastResult) + filterIndex, isEnd = hp.getNextFilterIndex(filterIndex, lastResult) + if isEnd { + return LabelEND // jumpIf end of pipeline + } if filterIndex == len(hp.runningFilters) { return "" // reach the end of pipeline } else if filterIndex == -1 { @@ -500,9 +505,10 @@ func (hp *HTTPPipeline) Handle(ctx context.HTTPContext) { } ctx.SetHandlerCaller(handle) - handle("") + result := handle("") ctx.AddTag(filterStat.marshalAndRelease()) + return result } func (hp *HTTPPipeline) getRunningFilter(name string) *runningFilter { diff --git a/pkg/object/httppipeline/httppipeline_test.go b/pkg/object/httppipeline/httppipeline_test.go index 6f5bf92a4e..02920e90bd 100644 --- a/pkg/object/httppipeline/httppipeline_test.go +++ b/pkg/object/httppipeline/httppipeline_test.go @@ -18,12 +18,13 @@ package httppipeline import ( + "reflect" + "testing" + "github.com/megaease/easegress/pkg/context" "github.com/megaease/easegress/pkg/context/contexttest" "github.com/megaease/easegress/pkg/logger" "github.com/megaease/easegress/pkg/supervisor" - "reflect" - "testing" ) func CreateObjectMock(kind string) Filter { @@ -275,16 +276,16 @@ filters: httpPipeline.Inherit(superSpec, &httpPipeline, nil) t.Run("test getNextFilterIndex", func(t *testing.T) { - if ind := httpPipeline.getNextFilterIndex(0, ""); ind != 1 { + if ind, end := httpPipeline.getNextFilterIndex(0, ""); ind != 1 && end != false { t.Errorf("next index should be 1, was %d", ind) } - if ind := httpPipeline.getNextFilterIndex(0, "invalid"); ind != 3 { + if ind, end := httpPipeline.getNextFilterIndex(0, "invalid"); ind != 3 && end != true { t.Errorf("next index should be 3, was %d", ind) } - if ind := httpPipeline.getNextFilterIndex(0, "unknown"); ind != -1 { + if ind, end := httpPipeline.getNextFilterIndex(0, "unknown"); ind != -1 && end != false { t.Errorf("next index should be -1, was %d", ind) } - if ind := httpPipeline.getNextFilterIndex(1, "specialCase"); ind != 2 { + if ind, end := httpPipeline.getNextFilterIndex(1, "specialCase"); ind != 2 && end != false { t.Errorf("next index should be 2, was %d", ind) } }) diff --git a/pkg/object/httpserver/mux.go b/pkg/object/httpserver/mux.go index 25fd1f5d2c..32d6653e3f 100644 --- a/pkg/object/httpserver/mux.go +++ b/pkg/object/httpserver/mux.go @@ -25,6 +25,8 @@ import ( "strings" "sync/atomic" + "github.com/megaease/easegress/pkg/object/globalfilter" + "github.com/megaease/easegress/pkg/context" "github.com/megaease/easegress/pkg/logger" "github.com/megaease/easegress/pkg/protocol" @@ -454,7 +456,13 @@ func (m *mux) handleRequestWithCache(rules *muxRules, ctx context.HTTPContext, c path = ci.path.pathRE.ReplaceAllString(path, ci.path.rewriteTarget) ctx.Request().SetPath(path) } - handler.Handle(ctx) + // global filter + globalFilter := m.getGlobalFilter(rules) + if globalFilter == nil { + handler.Handle(ctx) + return + } + globalFilter.Handle(ctx, handler) } } @@ -473,6 +481,21 @@ func (m *mux) appendXForwardedFor(ctx context.HTTPContext) { } } +func (m *mux) getGlobalFilter(rules *muxRules) *globalfilter.GlobalFilter { + if rules.spec.GlobalFilter == "" { + return nil + } + globalFilter, ok := rules.superSpec.Super().GetBusinessController(rules.spec.GlobalFilter) + if globalFilter == nil || !ok { + return nil + } + globalFilterInstance, ok := globalFilter.Instance().(*globalfilter.GlobalFilter) + if !ok { + return nil + } + return globalFilterInstance +} + func (m *mux) close() { rules := m.rules.Load().(*muxRules) err := rules.tracer.Close() diff --git a/pkg/object/httpserver/spec.go b/pkg/object/httpserver/spec.go index 030b11c5f9..15a491727e 100644 --- a/pkg/object/httpserver/spec.go +++ b/pkg/object/httpserver/spec.go @@ -54,6 +54,8 @@ type ( IPFilter *ipfilter.Spec `yaml:"ipFilter,omitempty" jsonschema:"omitempty"` Rules []*Rule `yaml:"rules" jsonschema:"omitempty"` + + GlobalFilter string `yaml:"globalFilter,omitempty" jsonschema:"omitempty"` } // Rule is first level entry of router. diff --git a/pkg/protocol/http.go b/pkg/protocol/http.go index 3fcb49a3ce..9c553b84d3 100644 --- a/pkg/protocol/http.go +++ b/pkg/protocol/http.go @@ -27,7 +27,7 @@ type ( // HTTPHandler is the common handler for the all backends // which handle the traffic from HTTPServer. HTTPHandler interface { - Handle(ctx context.HTTPContext) + Handle(ctx context.HTTPContext) string } // MuxMapper gets HTTP handler pipeline with mutex diff --git a/pkg/registry/registry.go b/pkg/registry/registry.go index 5fad17c8fa..8c5f5c76b4 100644 --- a/pkg/registry/registry.go +++ b/pkg/registry/registry.go @@ -42,6 +42,7 @@ import ( _ "github.com/megaease/easegress/pkg/object/etcdserviceregistry" _ "github.com/megaease/easegress/pkg/object/eurekaserviceregistry" _ "github.com/megaease/easegress/pkg/object/function" + _ "github.com/megaease/easegress/pkg/object/globalfilter" _ "github.com/megaease/easegress/pkg/object/httppipeline" _ "github.com/megaease/easegress/pkg/object/httpserver" _ "github.com/megaease/easegress/pkg/object/ingresscontroller"