Skip to content

Commit

Permalink
rate limit filter (#169)
Browse files Browse the repository at this point in the history
* update dubbo-go to v1.5.6 & fmt sth.

* fix fmt

* Update go.mod

* update dubbo-go to v1.5.6 & fmt sth.

* update dubbo-go to v1.5.6 & fmt sth.

* fix fmt

* rateLimit filter

* add licence header

* fix imports

* add license header

* add license header

* fix ci

* fix matcher var inline

* fix ci

* fix sync.RWMutex

* errors.warp

* test table

* remove unreachable statement

* Pattern pattern,omitempty

* update develop

* update the docs for ratelimit

* update develop

* ratelimit filter

* ratelimit filter

* ratelimit filter

* ratelimit filter

* ratelimit filter

* ratelimit filter

* ratelimit filter

* ratelimit filter

* ratelimit filter

* ratelimit filter

* ratelimit filter

* ratelimit filter

Co-authored-by: Zhenxu Ke <[email protected]>
  • Loading branch information
mark4z and kezhenxu94 authored Jun 24, 2021
1 parent 4bae729 commit fbaf7ed
Show file tree
Hide file tree
Showing 40 changed files with 966 additions and 337 deletions.
32 changes: 31 additions & 1 deletion configs/api_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,34 @@ pluginsGroup:
- name: "trace"
version: "0.0.1"
priority: 1000
externalLookupName: "ExternalPluginTrace"
externalLookupName: "ExternalPluginTrace"
rateLimit:
resources:
- name: test-dubbo
items:
#Exact
- matchStrategy: 0
pattern: "/api/v1/test-dubbo/user"
#Regex
- matchStrategy: 1
pattern: "/api/v1/test-dubbo/user/*"
- name: test-http
items:
#Exact
- matchStrategy: 0
pattern: "/api/v1/http/foo"
- matchStrategy: 0
pattern: "/api/v1/http/bar"
#Regex
- matchStrategy: 1
pattern: "/api/v1/http/foo/*"
- matchStrategy: 1
pattern: "/api/v1/http/bar/*"

rules:
#qps sample At most 100 requests can be passed in 1000ms, so qps is 100
- resource: "test-dubbo"
enable: true
flowRule:
threshold: 100
statintervalinms: 1000
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* [dubbo](sample/dubbo.md)
* [http](sample/http.md)
* [mock](sample/mock.md)
* [plugins](../samples/plugins/index.md)

## [User Guide](user/README.md)

Expand Down
19 changes: 12 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,29 @@ module github.com/apache/dubbo-go-pixiu
go 1.14

require (
github.com/apache/dubbo-go v1.5.5
github.com/apache/dubbo-go-hessian2 v1.7.0
github.com/alibaba/sentinel-golang v1.0.2
github.com/apache/dubbo-go v1.5.7-rc1
github.com/apache/dubbo-go-hessian2 v1.9.2
github.com/coreos/etcd v3.3.25+incompatible
github.com/dubbogo/dubbo-go-pixiu-filter v0.1.4-0.20210427062645-0bec837d429e
github.com/dubbogo/go-zookeeper v1.0.2
github.com/dubbogo/dubbo-go-pixiu-filter v0.1.4-0.20210613012702-8488bf80772c
github.com/dubbogo/go-zookeeper v1.0.3
github.com/dubbogo/gost v1.11.7
github.com/emirpasic/gods v1.12.0
github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32
github.com/goinggo/mapstructure v0.0.0-20140717182941-194205d9b4a9
github.com/hashicorp/consul/api v1.5.0
github.com/pkg/errors v0.9.1
github.com/shirou/gopsutil v3.21.3+incompatible // indirect
github.com/spf13/cast v1.3.1
github.com/stretchr/testify v1.6.1
github.com/stretchr/testify v1.7.0
github.com/tklauser/go-sysconf v0.3.5 // indirect
github.com/urfave/cli v1.22.4
go.uber.org/zap v1.16.0
google.golang.org/grpc v1.26.0
google.golang.org/grpc v1.33.1
gopkg.in/yaml.v2 v2.4.0
)

replace github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473 => github.com/envoyproxy/go-control-plane v0.8.0
replace (
github.com/envoyproxy/go-control-plane => github.com/envoyproxy/go-control-plane v0.8.0
google.golang.org/grpc v1.33.1 => google.golang.org/grpc v1.26.0
)
390 changes: 113 additions & 277 deletions go.sum

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
RecoveryFilter = "dgp.filters.recovery"
ResponseFilter = "dgp.filters.response"
AccessLogFilter = "dgp.filters.access_log"
RateLimitFilter = "dgp.filters.rate_limit"
)

const (
Expand Down
74 changes: 49 additions & 25 deletions pkg/filter/plugins/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,18 @@ import (
)

var (
apiURLWithPluginsMap = make(map[string]FilterChain)
groupWithPluginsMap = make(map[string]map[string]WithFunc)
errEmptyPluginConfig = errors.New("Empty plugin config")
// url path -> filter chain
filterChainCache = make(map[string]FilterChain)
groupCache = make(map[string]map[string]WithFunc)

errEmptyConfig = errors.New("Empty plugin config")
)

func Init(groups []config.PluginsGroup, filePath string, resources []config.Resource) {
InitPluginsGroup(groups, filePath)
InitFilterChainCache(resources)
}

// FilterChain include Pre & Post filters
type FilterChain struct {
Pre context.FilterChain
Expand All @@ -53,6 +60,16 @@ type WithFunc struct {
fn context.FilterFunc
}

// OnResourceUpdate update plugins cache map when api-resource update
func OnResourceUpdate(resource *config.Resource) {
InitFilterChainForResource(resource, "", nil)
}

// OnGroupUpdate update group cache
func OnGroupUpdate(groups []config.PluginsGroup, filePath string) {
InitPluginsGroup(groups, filePath)
}

// InitPluginsGroup prase api_config.yaml(pluginsGroup) to map[string][]PluginsWithFunc
func InitPluginsGroup(groups []config.PluginsGroup, filePath string) {
if "" == filePath || len(groups) == 0 {
Expand All @@ -65,6 +82,8 @@ func InitPluginsGroup(groups []config.PluginsGroup, filePath string) {
panic(err)
}

newGroupMap := make(map[string]map[string]WithFunc, len(groups))

for _, group := range groups {
pwdMap := make(map[string]WithFunc, len(group.Plugins))

Expand All @@ -74,12 +93,13 @@ func InitPluginsGroup(groups []config.PluginsGroup, filePath string) {
pwdMap[pl.Name] = pwf
}

groupWithPluginsMap[group.GroupName] = pwdMap
newGroupMap[group.GroupName] = pwdMap
}
groupCache = newGroupMap
}

// InitAPIURLWithFilterChain must behind InitPluginsGroup call
func InitAPIURLWithFilterChain(resources []config.Resource) {
// InitFilterChainCache must behind InitPluginsGroup call
func InitFilterChainCache(resources []config.Resource) {
pairURLWithFilterChain("", resources, nil)
}

Expand All @@ -94,32 +114,36 @@ func pairURLWithFilterChain(parentPath string, resources []config.Resource, pare
}

for _, resource := range resources {
fullPath := groupPath + resource.Path
if !strings.HasPrefix(resource.Path, constant.PathSlash) {
continue
}
InitFilterChainForResource(&resource, groupPath, parentFilterChains)
}
}

func InitFilterChainForResource(resource *config.Resource, groupPath string, parent *FilterChain) {
fullPath := groupPath + resource.Path
if !strings.HasPrefix(resource.Path, constant.PathSlash) {
return
}

currentFilterChains, err := getAPIFilterChains(&resource.Plugins)
currentFilterChains, err := getAPIFilterChains(&resource.Plugins)

if err == nil {
apiURLWithPluginsMap[fullPath] = *currentFilterChains
parentFilterChains = currentFilterChains
} else {
if parentFilterChains != nil {
apiURLWithPluginsMap[fullPath] = *parentFilterChains
}
if err == nil {
filterChainCache[fullPath] = *currentFilterChains
parent = currentFilterChains
} else {
if parent != nil {
filterChainCache[fullPath] = *parent
}
}

if len(resource.Resources) > 0 {
pairURLWithFilterChain(resource.Path, resource.Resources, parentFilterChains)
}
if len(resource.Resources) > 0 {
pairURLWithFilterChain(resource.Path, resource.Resources, parent)
}
}

// GetAPIFilterFuncsWithAPIURL is get filterchain with path
func GetAPIFilterFuncsWithAPIURL(url string) FilterChain {
// found from cache
if flc, found := apiURLWithPluginsMap[url]; found {
if flc, found := filterChainCache[url]; found {
logger.Debugf("GetExternalPlugins is:%v", flc)
return flc
}
Expand All @@ -141,7 +165,7 @@ func loadExternalPlugin(p *config.Plugin, pl *plugin.Plugin) context.FilterFunc
return sbf().Do()
}

panic(errEmptyPluginConfig)
panic(errEmptyConfig)
}

func getAPIFilterChains(pluginsConfig *config.PluginsConfig) (fcs *FilterChain, err error) {
Expand All @@ -165,7 +189,7 @@ func getAPIFilterFuncsWithPluginsGroup(plu *config.PluginsInUse) []context.Filte

// found with group name
for _, groupName := range plu.GroupNames {
pwfMap, found := groupWithPluginsMap[groupName]
pwfMap, found := groupCache[groupName]
if found {
for _, pwf := range pwfMap {
tmpMap[pwf.Name] = pwf.fn
Expand All @@ -174,7 +198,7 @@ func getAPIFilterFuncsWithPluginsGroup(plu *config.PluginsInUse) []context.Filte
}

// found with with name from all group
for _, group := range groupWithPluginsMap {
for _, group := range groupCache {
for _, name := range plu.PluginNames {
if pwf, found := group[name]; found {
tmpMap[pwf.Name] = pwf.fn
Expand Down
4 changes: 2 additions & 2 deletions pkg/filter/plugins/plugins_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ func TestInitApiUrlWithFilterChain(t *testing.T) {
assert.Empty(t, err)

InitPluginsGroup(apiConfig.PluginsGroup, apiConfig.PluginFilePath)
InitAPIURLWithFilterChain(apiConfig.Resources)
InitFilterChainCache(apiConfig.Resources)
}

func TestGetApiFilterFuncsWithApiUrl(t *testing.T) {
apiConfig, err := config.LoadAPIConfigFromFile(mockFile)
assert.Empty(t, err)

InitPluginsGroup(apiConfig.PluginsGroup, apiConfig.PluginFilePath)
InitAPIURLWithFilterChain(apiConfig.Resources)
InitFilterChainCache(apiConfig.Resources)

flc := GetAPIFilterFuncsWithAPIURL("/")
assert.Equal(t, 0, len(flc.Pre))
Expand Down
73 changes: 73 additions & 0 deletions pkg/filter/ratelimit/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ratelimit

import (
sentinel "github.com/alibaba/sentinel-golang/api"
sc "github.com/alibaba/sentinel-golang/core/config"
"github.com/alibaba/sentinel-golang/core/flow"
"github.com/alibaba/sentinel-golang/logging"
"github.com/dubbogo/dubbo-go-pixiu-filter/pkg/api/config/ratelimit"
"github.com/pkg/errors"
)

import (
"github.com/apache/dubbo-go-pixiu/pkg/filter/ratelimit/matcher"
"github.com/apache/dubbo-go-pixiu/pkg/logger"
)

func rateLimitInit(c *ratelimit.Config) error {
sentinelConf := sc.NewDefaultConfig()
if len(c.LogPath) > 0 {
sentinelConf.Sentinel.Log.Dir = c.LogPath
}
_ = logging.ResetGlobalLogger(getWrappedLogger())

if err := sentinel.InitWithConfig(sentinelConf); err != nil {
return errors.Wrap(err, "rate limit init fail")
}
matcher.Init()

OnUpdate(c)
return nil
}

// OnUpdate update api & rule
func OnUpdate(c *ratelimit.Config) {
OnResourcesUpdate(c.Resources)
OnRulesUpdate(c.Rules)
}

// OnRulesUpdate update rule
func OnRulesUpdate(rules []ratelimit.Rule) {
var enableRules []*flow.Rule
for _, v := range rules {
if v.Enable {
enableRules = append(enableRules, &v.FlowRule)
}
}

if _, err := flow.LoadRules(enableRules); err != nil {
logger.Warnf("rate limit load rules err: %v", err)
}
}

// OnResourcesUpdate update matcher for resources
func OnResourcesUpdate(apis []ratelimit.Resource) {
matcher.Load(apis)
}
33 changes: 33 additions & 0 deletions pkg/filter/ratelimit/init_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ratelimit

import "testing"

func TestInit(t *testing.T) {
c := GetMockedRateLimitConfig()
err := rateLimitInit(c)
if err != nil {
t.Fatal(err)
}
}

func TestOnUpdate(t *testing.T) {
config := GetMockedRateLimitConfig()
OnUpdate(config)
}
Loading

0 comments on commit fbaf7ed

Please sign in to comment.