diff --git a/cmd/pixiu/pixiu.go b/cmd/pixiu/pixiu.go index d44311b88..df2e99a97 100644 --- a/cmd/pixiu/pixiu.go +++ b/cmd/pixiu/pixiu.go @@ -30,15 +30,6 @@ import ( "github.com/urfave/cli" ) -import ( - _ "github.com/apache/dubbo-go-pixiu/pkg/filter/accesslog" - _ "github.com/apache/dubbo-go-pixiu/pkg/filter/logger" - _ "github.com/apache/dubbo-go-pixiu/pkg/filter/recovery" - _ "github.com/apache/dubbo-go-pixiu/pkg/filter/remote" - _ "github.com/apache/dubbo-go-pixiu/pkg/filter/response" - _ "github.com/apache/dubbo-go-pixiu/pkg/filter/timeout" -) - // Version pixiu version var Version = "0.3.0" diff --git a/pkg/client/dubbo/dubbo.go b/pkg/client/dubbo/dubbo.go index 9e5454e36..510c3d6d3 100644 --- a/pkg/client/dubbo/dubbo.go +++ b/pkg/client/dubbo/dubbo.go @@ -260,7 +260,9 @@ func (dc *Client) create(key string, irequest fc.IntegrationRequest) *dg.Generic dc.lock.Lock() defer dc.lock.Unlock() referenceConfig.GenericLoad(key) - time.Sleep(200 * time.Millisecond) // sleep to wait invoker create + //TODO: fix it later + // sleep to wait invoker create + time.Sleep(500 * time.Millisecond) clientService := referenceConfig.GetRPCService().(*dg.GenericService) dc.GenericServicePool[key] = clientService diff --git a/pkg/config/api_config.go b/pkg/config/api_config.go index a23285f65..4e62f60a8 100644 --- a/pkg/config/api_config.go +++ b/pkg/config/api_config.go @@ -27,6 +27,7 @@ import ( import ( fc "github.com/dubbogo/dubbo-go-pixiu-filter/pkg/api/config" + fr "github.com/dubbogo/dubbo-go-pixiu-filter/pkg/api/config/ratelimit" etcdv3 "github.com/dubbogo/gost/database/kv/etcd/v3" perrors "github.com/pkg/errors" "go.etcd.io/etcd/clientv3" @@ -47,6 +48,12 @@ var ( lock sync.RWMutex ) +var ( + BASE_INFO_NAME = "name" + BASE_INFO_DESC = "description" + BASE_INFO_PFP = "pluginFilePath" +) + // APIConfigResourceListener defines api resource and method config listener interface type APIConfigResourceListener interface { // ResourceChange handle modify resource event @@ -61,6 +68,12 @@ type APIConfigResourceListener interface { MethodAdd(res fc.Resource, method fc.Method) bool // MethodDelete handle delete method event MethodDelete(res fc.Resource, method fc.Method) bool + + PluginPathChange(filePath string) + + PluginGroupChange(group []fc.PluginsGroup) + + RateLimitChange(*fr.Config) } // LoadAPIConfigFromFile load the api config from file @@ -72,7 +85,7 @@ func LoadAPIConfigFromFile(path string) (*fc.APIConfig, error) { apiConf := &fc.APIConfig{} err := yaml.UnmarshalYMLConfig(path, apiConf) if err != nil { - return nil, perrors.Errorf("unmarshalYmlConfig error %v", perrors.WithStack(err)) + return nil, perrors.Errorf("unmarshalYmlConfig error %s", perrors.WithStack(err)) } apiConfig = apiConf return apiConf, nil @@ -86,13 +99,13 @@ func LoadAPIConfig(metaConfig *model.APIMetaConfig) (*fc.APIConfig, error) { etcdv3.WithEndpoints(strings.Split(metaConfig.Address, ",")...), ) if err != nil { - return nil, perrors.Errorf("Init etcd client fail error %v", err) + return nil, perrors.Errorf("Init etcd client fail error %s", err) } client = tmpClient kList, vList, err := client.GetChildren(metaConfig.APIConfigPath) if err != nil { - return nil, perrors.Errorf("Get remote config fail error %v", err) + return nil, perrors.Errorf("Get remote config fail error %s", err) } if err = initAPIConfigFromKVList(kList, vList); err != nil { return nil, err @@ -105,11 +118,21 @@ func LoadAPIConfig(metaConfig *model.APIMetaConfig) (*fc.APIConfig, error) { func initAPIConfigFromKVList(kList, vList []string) error { var skList, svList, mkList, mvList []string + var baseInfo string + var pluginGroup []string + var rateLimit string for i, k := range kList { v := vList[i] + //handle base info + re := getCheckBaseInfoRegexp() + if m := re.Match([]byte(k)); m { + baseInfo = v + continue + } + // handle resource - re := getCheckResourceRegexp() + re = getCheckResourceRegexp() if m := re.Match([]byte(k)); m { skList = append(skList, k) svList = append(svList, v) @@ -122,18 +145,44 @@ func initAPIConfigFromKVList(kList, vList []string) error { mvList = append(mvList, v) continue } + + //handle plugin group + re = getCheckPluginsGroupRegexp() + if m := re.Match([]byte(k)); m { + pluginGroup = append(pluginGroup, v) + continue + } + + //handle rate limit config + re = getCheckRatelimitRegexp() + if m := re.Match([]byte(k)); m { + rateLimit = v + continue + } } lock.Lock() defer lock.Unlock() tmpApiConf := &fc.APIConfig{} + if err := initBaseInfoFromString(tmpApiConf, baseInfo); err != nil { + logger.Errorf("initBaseInfoFromString error %s", err) + return err + } if err := initAPIConfigServiceFromKvList(tmpApiConf, skList, svList); err != nil { - logger.Error("initAPIConfigServiceFromKvList error %v", err.Error()) + logger.Errorf("initAPIConfigServiceFromKvList error %s", err) return err } if err := initAPIConfigMethodFromKvList(tmpApiConf, mkList, mvList); err != nil { - logger.Error("initAPIConfigMethodFromKvList error %v", err.Error()) + logger.Errorf("initAPIConfigMethodFromKvList error %s", err) + return err + } + if err := initAPIConfigPluginsFromStringList(tmpApiConf, pluginGroup); err != nil { + logger.Errorf("initAPIConfigPluginsFromStringList error %s", err) + return err + } + if err := initAPIConfigRatelimitFromString(tmpApiConf, rateLimit); err != nil { + logger.Errorf("initAPIConfigRatelimitFromString error %s", err) return err } @@ -141,13 +190,55 @@ func initAPIConfigFromKVList(kList, vList []string) error { return nil } +func initBaseInfoFromString(conf *fc.APIConfig, str string) error { + properties := make(map[string]string, 8) + if err := yaml.UnmarshalYML([]byte(str), properties); err != nil { + logger.Errorf("unmarshalYmlConfig error %s", err) + return err + } + if v, ok := properties[BASE_INFO_NAME]; ok { + conf.Name = v + } + if v, ok := properties[BASE_INFO_DESC]; ok { + conf.Description = v + } + if v, ok := properties[BASE_INFO_PFP]; ok { + conf.PluginFilePath = v + } + return nil +} + +func initAPIConfigRatelimitFromString(conf *fc.APIConfig, str string) error { + c := fr.Config{} + if err := yaml.UnmarshalYML([]byte(str), &c); err != nil { + logger.Errorf("unmarshalYmlConfig error %s", err) + return err + } + conf.RateLimit = c + return nil +} + +func initAPIConfigPluginsFromStringList(conf *fc.APIConfig, plugins []string) error { + var groups []fc.PluginsGroup + for _, v := range plugins { + g := fc.PluginsGroup{} + if err := yaml.UnmarshalYML([]byte(v), &g); err != nil { + logger.Errorf("unmarshalYmlConfig error %s", err) + return err + } + groups = append(groups, g) + } + conf.PluginsGroup = groups + return nil +} + func initAPIConfigMethodFromKvList(config *fc.APIConfig, kList, vList []string) error { for i := range kList { v := vList[i] method := &fc.Method{} err := yaml.UnmarshalYML([]byte(v), method) if err != nil { - logger.Error("unmarshalYmlConfig error %v", err.Error()) + logger.Errorf("unmarshalYmlConfig error %s", err) return err } @@ -188,7 +279,7 @@ func initAPIConfigServiceFromKvList(config *fc.APIConfig, kList, vList []string) resource := &fc.Resource{} err := yaml.UnmarshalYML([]byte(v), resource) if err != nil { - logger.Error("unmarshalYmlConfig error %v", err.Error()) + logger.Errorf("unmarshalYmlConfig error %s", err) return err } @@ -219,7 +310,7 @@ func listenResourceAndMethodEvent(key string) bool { for { wc, err := client.WatchWithOption(key, clientv3.WithPrefix()) if err != nil { - logger.Warnf("Watch api config {key:%s} = error{%v}", key, err) + logger.Warnf("Watch api config {key:%s} = error{%s}", key, err) return false } @@ -275,7 +366,7 @@ func handleDeleteEvent(key, val []byte) { resourceIdStr := pathArray[len(pathArray)-1] ID, err := strconv.Atoi(resourceIdStr) if err != nil { - logger.Error("handleDeleteEvent ID is not int error %v", err) + logger.Errorf("handleDeleteEvent ID is not int error %s", err) return } deleteApiConfigResource(ID) @@ -292,18 +383,24 @@ func handleDeleteEvent(key, val []byte) { resourceIdStr := pathArray[len(pathArray)-3] resourceId, err := strconv.Atoi(resourceIdStr) if err != nil { - logger.Error("handleDeleteEvent ID is not int error %v", err) + logger.Errorf("handleDeleteEvent ID is not int error %s", err) return } methodIdStr := pathArray[len(pathArray)-1] methodId, err := strconv.Atoi(methodIdStr) if err != nil { - logger.Error("handleDeleteEvent ID is not int error %v", err) + logger.Errorf("handleDeleteEvent ID is not int error %s", err) return } deleteApiConfigMethod(resourceId, methodId) } + + re = getCheckRatelimitRegexp() + if m := re.Match(key); m { + empty := &fr.Config{} + listener.RateLimitChange(empty) + } } func handlePutEvent(key, val []byte) { @@ -315,7 +412,7 @@ func handlePutEvent(key, val []byte) { res := &fc.Resource{} err := yaml.UnmarshalYML(val, res) if err != nil { - logger.Error("handlePutEvent UnmarshalYML error %v", err) + logger.Errorf("handlePutEvent UnmarshalYML error %s", err) return } mergeApiConfigResource(*res) @@ -327,11 +424,32 @@ func handlePutEvent(key, val []byte) { res := &fc.Method{} err := yaml.UnmarshalYML(val, res) if err != nil { - logger.Error("handlePutEvent UnmarshalYML error %v", err) + logger.Errorf("handlePutEvent UnmarshalYML error %s", err) return } mergeApiConfigMethod(res.ResourcePath, *res) } + + //handle base info + re = getCheckBaseInfoRegexp() + if m := re.Match(key); m { + mergeBaseInfo(val) + return + } + + //handle plugins group + re = getCheckPluginsGroupRegexp() + if m := re.Match(key); m { + mergePluginGroup(val) + return + } + + //handle ratelimit + re = getCheckRatelimitRegexp() + if m := re.Match(key); m { + mergeRatelimit(val) + return + } } func deleteApiConfigResource(resourceId int) { @@ -361,6 +479,36 @@ func mergeApiConfigResource(val fc.Resource) { listener.ResourceAdd(val) } +func mergeRatelimit(val []byte) { + c := &fr.Config{} + if err := yaml.UnmarshalYML(val, c); err != nil { + logger.Errorf("unmarshalYmlConfig error %s", err) + return + } + apiConfig.RateLimit = *c + listener.RateLimitChange(c) +} + +func mergePluginGroup(val []byte) { + g := &fc.PluginsGroup{} + if err := yaml.UnmarshalYML(val, g); err != nil { + logger.Errorf("unmarshalYmlConfig error %s", err) + return + } + for i, v := range apiConfig.PluginsGroup { + if v.GroupName == g.GroupName { + apiConfig.PluginsGroup[i] = *g + } + } + listener.PluginGroupChange(apiConfig.PluginsGroup) +} + +func mergeBaseInfo(val []byte) { + _ = initBaseInfoFromString(apiConfig, string(val)) + + listener.PluginPathChange(apiConfig.PluginFilePath) +} + func deleteApiConfigMethod(resourceId, methodId int) { for _, resource := range apiConfig.Resources { if resource.ID != resourceId { @@ -401,12 +549,24 @@ func mergeApiConfigMethod(path string, val fc.Method) { } } +func getCheckBaseInfoRegexp() *regexp.Regexp { + return regexp.MustCompile(".+/base$") +} + func getCheckResourceRegexp() *regexp.Regexp { return regexp.MustCompile(".+/resources/[^/]+/?$") } func getExtractMethodRegexp() *regexp.Regexp { - return regexp.MustCompile("resources/([^/]+)/method/[^/]+/?$") + return regexp.MustCompile(".+/resources/([^/]+)/method/[^/]+/?$") +} + +func getCheckPluginsGroupRegexp() *regexp.Regexp { + return regexp.MustCompile(".+/filter/pluginGroup/[^/]+/?$") +} + +func getCheckRatelimitRegexp() *regexp.Regexp { + return regexp.MustCompile(".+/filter/ratelimit/[^/]+/?$") } // RegisterConfigListener register APIConfigListener diff --git a/pkg/filter/accesslog/access_log.go b/pkg/filter/accesslog/access_log.go index 6a1b40ea9..11a13fbd3 100644 --- a/pkg/filter/accesslog/access_log.go +++ b/pkg/filter/accesslog/access_log.go @@ -40,7 +40,7 @@ import ( var accessLogWriter = &model.AccessLogWriter{AccessLogDataChan: make(chan model.AccessLogData, constant.LogDataBuffer)} -func init() { +func Init() { extension.SetFilterFunc(constant.AccessLogFilter, accessLog()) accessLogWriter.Write() } diff --git a/pkg/filter/plugins/plugins.go b/pkg/filter/plugins/plugins.go index 3489d5869..f3bac9c41 100644 --- a/pkg/filter/plugins/plugins.go +++ b/pkg/filter/plugins/plugins.go @@ -38,6 +38,7 @@ var ( // url path -> filter chain filterChainCache = make(map[string]FilterChain) groupCache = make(map[string]map[string]WithFunc) + localFilePath = "" errEmptyConfig = errors.New("Empty plugin config") ) @@ -60,24 +61,33 @@ type WithFunc struct { fn context.FilterFunc } +func OnFilePathChange(filePath string) { + if len(filePath) == 0 { + return + } + localFilePath = filePath +} + // OnResourceUpdate update plugins cache map when api-resource update func OnResourceUpdate(resource *config.Resource) { InitFilterChainForResource(resource, "", nil) } // OnGroupUpdate update group cache -func OnGroupUpdate(groups []config.PluginsGroup, filePath string) { - InitPluginsGroup(groups, filePath) +func OnGroupUpdate(groups []config.PluginsGroup) { + InitPluginsGroup(groups, "") } // InitPluginsGroup prase api_config.yaml(pluginsGroup) to map[string][]PluginsWithFunc func InitPluginsGroup(groups []config.PluginsGroup, filePath string) { - if "" == filePath || len(groups) == 0 { + OnFilePathChange(filePath) + + if "" == localFilePath || len(groups) == 0 { return } // load file.so - pls, err := plugin.Open(filePath) + pls, err := plugin.Open(localFilePath) if nil != err { panic(err) } diff --git a/pkg/filter/ratelimit/rate_limit.go b/pkg/filter/ratelimit/rate_limit.go index 4bbf8cdba..c1f519670 100644 --- a/pkg/filter/ratelimit/rate_limit.go +++ b/pkg/filter/ratelimit/rate_limit.go @@ -42,7 +42,7 @@ import ( // Init cache the filter func & init sentinel func Init(config *ratelimit.Config) { if err := rateLimitInit(config); err != nil { - logger.Errorf("rate limit init fail: %v", err) + logger.Errorf("rate limit init fail: %s", err) //if sentinel init fail, just return a empty filter func to avoid error. extension.SetFilterFunc(constant.RateLimitFilter, func(context fc.Context) {}) diff --git a/pkg/initialize/init.go b/pkg/initialize/init.go index ca90e3b84..9aa7a25ea 100644 --- a/pkg/initialize/init.go +++ b/pkg/initialize/init.go @@ -18,6 +18,7 @@ package initialize import ( + "github.com/apache/dubbo-go-pixiu/pkg/filter/accesslog" "github.com/apache/dubbo-go-pixiu/pkg/filter/api" "github.com/apache/dubbo-go-pixiu/pkg/filter/authority" "github.com/apache/dubbo-go-pixiu/pkg/filter/logger" @@ -41,6 +42,7 @@ func Run(config config.APIConfig) { } func filterInit(config *config.APIConfig) { + accesslog.Init() api.Init() authority.Init() logger.Init() diff --git a/pkg/service/api/discovery_service.go b/pkg/service/api/discovery_service.go index 0042b6a23..ce7a79f6c 100644 --- a/pkg/service/api/discovery_service.go +++ b/pkg/service/api/discovery_service.go @@ -27,12 +27,15 @@ import ( "github.com/apache/dubbo-go-pixiu/pkg/common/constant" "github.com/apache/dubbo-go-pixiu/pkg/common/extension" pc "github.com/apache/dubbo-go-pixiu/pkg/config" + "github.com/apache/dubbo-go-pixiu/pkg/filter/plugins" + "github.com/apache/dubbo-go-pixiu/pkg/filter/ratelimit" "github.com/apache/dubbo-go-pixiu/pkg/router" "github.com/apache/dubbo-go-pixiu/pkg/service" ) import ( "github.com/dubbogo/dubbo-go-pixiu-filter/pkg/api/config" + ratelimitConf "github.com/dubbogo/dubbo-go-pixiu-filter/pkg/api/config/ratelimit" fr "github.com/dubbogo/dubbo-go-pixiu-filter/pkg/router" ) @@ -54,13 +57,13 @@ func NewLocalMemoryAPIDiscoveryService() *LocalMemoryAPIDiscoveryService { } // AddAPI adds a method to the router tree -func (ads *LocalMemoryAPIDiscoveryService) AddAPI(api fr.API) error { - return ads.router.PutAPI(api) +func (l *LocalMemoryAPIDiscoveryService) AddAPI(api fr.API) error { + return l.router.PutAPI(api) } // GetAPI returns the method to the caller -func (ads *LocalMemoryAPIDiscoveryService) GetAPI(url string, httpVerb config.HTTPVerb) (fr.API, error) { - if api, ok := ads.router.FindAPI(url, httpVerb); ok { +func (l *LocalMemoryAPIDiscoveryService) GetAPI(url string, httpVerb config.HTTPVerb) (fr.API, error) { + if api, ok := l.router.FindAPI(url, httpVerb); ok { return *api, nil } @@ -68,80 +71,79 @@ func (ads *LocalMemoryAPIDiscoveryService) GetAPI(url string, httpVerb config.HT } // ClearAPI clear all api -func (ads *LocalMemoryAPIDiscoveryService) ClearAPI() error { - ads.router.ClearAPI() - return nil +func (l *LocalMemoryAPIDiscoveryService) ClearAPI() error { + return l.router.ClearAPI() } // RemoveAPIByPath remove all api belonged to path -func (ads *LocalMemoryAPIDiscoveryService) RemoveAPIByPath(deleted config.Resource) error { +func (l *LocalMemoryAPIDiscoveryService) RemoveAPIByPath(deleted config.Resource) error { _, groupPath := getDefaultPath() fullPath := getFullPath(groupPath, deleted.Path) - ads.router.DeleteNode(fullPath) + l.router.DeleteNode(fullPath) return nil } // RemoveAPIByPath remove all api -func (ads *LocalMemoryAPIDiscoveryService) RemoveAPI(fullPath string, method config.Method) error { - ads.router.DeleteAPI(fullPath, method.HTTPVerb) +func (l *LocalMemoryAPIDiscoveryService) RemoveAPI(fullPath string, method config.Method) error { + l.router.DeleteAPI(fullPath, method.HTTPVerb) return nil } // ResourceChange handle modify resource event -func (ads *LocalMemoryAPIDiscoveryService) ResourceChange(new config.Resource, old config.Resource) bool { - if err := modifyAPIFromResource(new, old, ads); err == nil { +func (l *LocalMemoryAPIDiscoveryService) ResourceChange(new config.Resource, old config.Resource) bool { + if err := modifyAPIFromResource(new, old, l); err == nil { return true } return false } // ResourceAdd handle add resource event -func (ads *LocalMemoryAPIDiscoveryService) ResourceAdd(res config.Resource) bool { +func (l *LocalMemoryAPIDiscoveryService) ResourceAdd(res config.Resource) bool { parentPath, groupPath := getDefaultPath() fullHeaders := make(map[string]string, 9) - if err := addAPIFromResource(res, ads, groupPath, parentPath, fullHeaders); err == nil { + if err := addAPIFromResource(res, l, groupPath, parentPath, fullHeaders); err == nil { return true } return false } // ResourceDelete handle delete resource event -func (ads *LocalMemoryAPIDiscoveryService) ResourceDelete(deleted config.Resource) bool { - if err := deleteAPIFromResource(deleted, ads); err == nil { +func (l *LocalMemoryAPIDiscoveryService) ResourceDelete(deleted config.Resource) bool { + if err := deleteAPIFromResource(deleted, l); err == nil { return true } return false } // MethodChange handle modify method event -func (ads *LocalMemoryAPIDiscoveryService) MethodChange(res config.Resource, new config.Method, old config.Method) bool { +func (l *LocalMemoryAPIDiscoveryService) MethodChange(res config.Resource, new config.Method, old config.Method) bool { _, groupPath := getDefaultPath() fullPath := getFullPath(groupPath, res.Path) fullHeaders := make(map[string]string, 9) - if err := modifyAPIFromMethod(fullPath, new, old, fullHeaders, ads); err == nil { + if err := modifyAPIFromMethod(fullPath, new, old, fullHeaders, l); err == nil { return true } return false } // MethodAdd handle add method event -func (ads *LocalMemoryAPIDiscoveryService) MethodAdd(res config.Resource, method config.Method) bool { +func (l *LocalMemoryAPIDiscoveryService) MethodAdd(res config.Resource, method config.Method) bool { _, groupPath := getDefaultPath() fullPath := getFullPath(groupPath, res.Path) fullHeaders := make(map[string]string, 9) - if err := addAPIFromMethod(fullPath, method, fullHeaders, ads); err == nil { + if err := addAPIFromMethod(fullPath, method, fullHeaders, l); err == nil { return true } return false } // MethodDelete handle delete method event -func (ads *LocalMemoryAPIDiscoveryService) MethodDelete(res config.Resource, method config.Method) bool { +func (l *LocalMemoryAPIDiscoveryService) MethodDelete(res config.Resource, method config.Method) bool { _, groupPath := getDefaultPath() fullPath := getFullPath(groupPath, res.Path) - if err := deleteAPIFromMethod(fullPath, method, ads); err == nil { + if err := deleteAPIFromMethod(fullPath, method, l); err == nil { return true } return false @@ -269,3 +271,15 @@ func loadAPIFromMethods(fullPath string, methods []config.Method, headers map[st } return nil } + +func (l *LocalMemoryAPIDiscoveryService) PluginPathChange(filePath string) { + plugins.OnFilePathChange(filePath) +} + +func (l *LocalMemoryAPIDiscoveryService) PluginGroupChange(group []config.PluginsGroup) { + plugins.OnGroupUpdate(group) +} + +func (l *LocalMemoryAPIDiscoveryService) RateLimitChange(c *ratelimitConf.Config) { + ratelimit.OnUpdate(c) +}