Skip to content

Commit

Permalink
Merge pull request apache#1 from pantianying/etcd_metadata_report
Browse files Browse the repository at this point in the history
Etcd metadata report
  • Loading branch information
flycash authored Jun 16, 2020
2 parents d6a68d5 + 05a6146 commit 0965296
Show file tree
Hide file tree
Showing 4 changed files with 298 additions and 4 deletions.
161 changes: 161 additions & 0 deletions metadata/report/etcd/report.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package etcd

import (
"encoding/json"
"strings"
"time"
)

import (
perrors "github.com/pkg/errors"
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/metadata/identifier"
"github.com/apache/dubbo-go/metadata/report"
"github.com/apache/dubbo-go/metadata/report/factory"
"github.com/apache/dubbo-go/remoting/etcdv3"
)

const DEFAULT_ROOT = "dubbo"

func init() {
extension.SetMetadataReportFactory("etcd", func() factory.MetadataReportFactory {
return &etcdMetadataReportFactory{}
})
}

// etcdMetadataReport is the implementation of MetadataReport based etcd
type etcdMetadataReport struct {
client *etcdv3.Client
root string
}

// StoreProviderMetadata will store the metadata
// metadata including the basic info of the server, provider info, and other user custom info
func (e *etcdMetadataReport) StoreProviderMetadata(providerIdentifier *identifier.MetadataIdentifier, serviceDefinitions string) error {
key := e.getNodeKey(providerIdentifier)
return e.client.Create(key, serviceDefinitions)
}

// StoreConsumerMetadata will store the metadata
// metadata including the basic info of the server, consumer info, and other user custom info
func (e *etcdMetadataReport) StoreConsumerMetadata(consumerMetadataIdentifier *identifier.MetadataIdentifier, serviceParameterString string) error {
key := e.getNodeKey(consumerMetadataIdentifier)
return e.client.Create(key, serviceParameterString)
}

// SaveServiceMetadata will store the metadata
// metadata including the basic info of the server, service info, and other user custom info
func (e *etcdMetadataReport) SaveServiceMetadata(metadataIdentifier *identifier.ServiceMetadataIdentifier, url common.URL) error {
key := e.getNodeKey(metadataIdentifier)
return e.client.Create(key, url.String())
}

// RemoveServiceMetadata will remove the service metadata
func (e *etcdMetadataReport) RemoveServiceMetadata(metadataIdentifier *identifier.ServiceMetadataIdentifier) error {
return e.client.Delete(e.getNodeKey(metadataIdentifier))
}

// GetExportedURLs will look up the exported urls.
// if not found, an empty list will be returned.
func (e *etcdMetadataReport) GetExportedURLs(metadataIdentifier *identifier.ServiceMetadataIdentifier) []string {
content, err := e.client.Get(e.getNodeKey(metadataIdentifier))
if err != nil {
logger.Errorf("etcdMetadataReport GetExportedURLs err:{%v}", err.Error())
return nil
}
if content == "" {
return []string{}
}
return []string{content}
}

// SaveSubscribedData will convert the urlList to json array and then store it
func (e *etcdMetadataReport) SaveSubscribedData(subscriberMetadataIdentifier *identifier.SubscriberMetadataIdentifier, urlList []common.URL) error {
if len(urlList) == 0 {
logger.Warnf("The url list is empty")
return nil
}
urlStrList := make([]string, 0, len(urlList))

for _, e := range urlList {
urlStrList = append(urlStrList, e.String())
}

bytes, err := json.Marshal(urlStrList)

if err != nil {
return perrors.WithMessage(err, "Could not convert the array to json")
}
key := e.getNodeKey(subscriberMetadataIdentifier)
return e.client.Create(key, string(bytes))
}

// GetSubscribedURLs will lookup the url
// if not found, an empty list will be returned
func (e *etcdMetadataReport) GetSubscribedURLs(subscriberMetadataIdentifier *identifier.SubscriberMetadataIdentifier) []string {
content, err := e.client.Get(e.getNodeKey(subscriberMetadataIdentifier))
if err != nil {
logger.Errorf("etcdMetadataReport GetSubscribedURLs err:{%v}", err.Error())
}
return []string{content}
}

// GetServiceDefinition will lookup the service definition
func (e *etcdMetadataReport) GetServiceDefinition(metadataIdentifier *identifier.MetadataIdentifier) string {
key := e.getNodeKey(metadataIdentifier)
content, err := e.client.Get(key)
if err != nil {
logger.Errorf("etcdMetadataReport GetServiceDefinition err:{%v}", err.Error())
return ""
}
return content
}

type etcdMetadataReportFactory struct{}

// CreateMetadataReport get the MetadataReport instance of etcd
func (e *etcdMetadataReportFactory) CreateMetadataReport(url *common.URL) report.MetadataReport {
timeout, _ := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
addresses := strings.Split(url.Location, ",")
client, err := etcdv3.NewClient(etcdv3.MetadataETCDV3Client, addresses, timeout, 1)
if err != nil {
logger.Errorf("Could not create etcd metadata report. URL: %s,error:{%v}", url.String(), err)
return nil
}
group := url.GetParam(constant.GROUP_KEY, DEFAULT_ROOT)
group = constant.PATH_SEPARATOR + strings.TrimPrefix(group, constant.PATH_SEPARATOR)
return &etcdMetadataReport{client: client, root: group}
}

func (e *etcdMetadataReport) getNodeKey(MetadataIdentifier identifier.IMetadataIdentifier) string {
var rootDir string
if e.root == constant.PATH_SEPARATOR {
rootDir = e.root
} else {
rootDir = e.root + constant.PATH_SEPARATOR
}
return rootDir + MetadataIdentifier.GetFilePathKey()
}
131 changes: 131 additions & 0 deletions metadata/report/etcd/report_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package etcd

import (
"net/url"
"strconv"
"testing"
)

import (
"github.com/coreos/etcd/embed"
"github.com/stretchr/testify/assert"
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/metadata/identifier"
)

const defaultEtcdV3WorkDir = "/tmp/default-dubbo-go-registry.etcd"

func initEtcd(t *testing.T) *embed.Etcd {
DefaultListenPeerURLs := "http://localhost:2380"
DefaultListenClientURLs := "http://localhost:2379"
lpurl, _ := url.Parse(DefaultListenPeerURLs)
lcurl, _ := url.Parse(DefaultListenClientURLs)
cfg := embed.NewConfig()
cfg.LPUrls = []url.URL{*lpurl}
cfg.LCUrls = []url.URL{*lcurl}
cfg.Dir = defaultEtcdV3WorkDir
e, err := embed.StartEtcd(cfg)
if err != nil {
t.Fatal(err)
}
return e
}

func TestEtcdMetadataReportFactory_CreateMetadataReport(t *testing.T) {
e := initEtcd(t)
url, err := common.NewURL("registry://127.0.0.1:2379", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
if err != nil {
t.Fatal(err)
}
metadataReportFactory := &etcdMetadataReportFactory{}
metadataReport := metadataReportFactory.CreateMetadataReport(&url)
assert.NotNil(t, metadataReport)
e.Close()
}

func TestEtcdMetadataReport_CRUD(t *testing.T) {
e := initEtcd(t)
url, err := common.NewURL("registry://127.0.0.1:2379", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
if err != nil {
t.Fatal(err)
}
metadataReportFactory := &etcdMetadataReportFactory{}
metadataReport := metadataReportFactory.CreateMetadataReport(&url)
assert.NotNil(t, metadataReport)

err = metadataReport.StoreConsumerMetadata(newMetadataIdentifier("consumer"), "consumer metadata")
assert.Nil(t, err)

err = metadataReport.StoreProviderMetadata(newMetadataIdentifier("provider"), "provider metadata")
assert.Nil(t, err)

serviceMi := newServiceMetadataIdentifier()
serviceUrl, _ := common.NewURL("registry://localhost:8848", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
metadataReport.SaveServiceMetadata(serviceMi, serviceUrl)
assert.Nil(t, err)

subMi := newSubscribeMetadataIdentifier()
urlList := make([]common.URL, 0, 1)
urlList = append(urlList, serviceUrl)
err = metadataReport.SaveSubscribedData(subMi, urlList)
assert.Nil(t, err)

err = metadataReport.RemoveServiceMetadata(serviceMi)
assert.Nil(t, err)

e.Close()
}

func newSubscribeMetadataIdentifier() *identifier.SubscriberMetadataIdentifier {
return &identifier.SubscriberMetadataIdentifier{
Revision: "subscribe",
MetadataIdentifier: *newMetadataIdentifier("provider"),
}

}

func newServiceMetadataIdentifier() *identifier.ServiceMetadataIdentifier {
return &identifier.ServiceMetadataIdentifier{
Protocol: "nacos",
Revision: "a",
BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{
ServiceInterface: "com.test.MyTest",
Version: "1.0.0",
Group: "test_group",
Side: "service",
},
}
}

func newMetadataIdentifier(side string) *identifier.MetadataIdentifier {
return &identifier.MetadataIdentifier{
Application: "test",
BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{
ServiceInterface: "com.test.MyTest",
Version: "1.0.0",
Group: "test_group",
Side: side,
},
}
}
8 changes: 5 additions & 3 deletions remoting/etcdv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ const (
MaxFailTimes = 15
// RegistryETCDV3Client client name
RegistryETCDV3Client = "etcd registry"
// metadataETCDV3Client client name
MetadataETCDV3Client = "etcd metadata"
)

var (
Expand Down Expand Up @@ -107,7 +109,7 @@ func ValidateClient(container clientFacade, opts ...Option) error {

// new Client
if container.Client() == nil {
newClient, err := newClient(options.name, options.endpoints, options.timeout, options.heartbeat)
newClient, err := NewClient(options.name, options.endpoints, options.timeout, options.heartbeat)
if err != nil {
logger.Warnf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}",
options.name, options.endpoints, options.timeout, err)
Expand All @@ -119,7 +121,7 @@ func ValidateClient(container clientFacade, opts ...Option) error {
// Client lose connection with etcd server
if container.Client().rawClient == nil {

newClient, err := newClient(options.name, options.endpoints, options.timeout, options.heartbeat)
newClient, err := NewClient(options.name, options.endpoints, options.timeout, options.heartbeat)
if err != nil {
logger.Warnf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}",
options.name, options.endpoints, options.timeout, err)
Expand Down Expand Up @@ -149,7 +151,7 @@ type Client struct {
Wait sync.WaitGroup
}

func newClient(name string, endpoints []string, timeout time.Duration, heartbeat int) (*Client, error) {
func NewClient(name string, endpoints []string, timeout time.Duration, heartbeat int) (*Client, error) {

ctx, cancel := context.WithCancel(context.Background())
rawClient, err := clientv3.New(clientv3.Config{
Expand Down
2 changes: 1 addition & 1 deletion remoting/etcdv3/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (suite *ClientTestSuite) TearDownSuite() {
}

func (suite *ClientTestSuite) setUpClient() *Client {
c, err := newClient(suite.etcdConfig.name,
c, err := NewClient(suite.etcdConfig.name,
suite.etcdConfig.endpoints,
suite.etcdConfig.timeout,
suite.etcdConfig.heartbeat)
Expand Down

0 comments on commit 0965296

Please sign in to comment.