Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ASoC 2022] Optimization of Pixiu timeout feature #475

Merged
merged 16 commits into from
Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pixiu/pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type Client interface {
// Apply to init client
Apply() error

// Close close the clinet
// Close close the client
Close() error

// Call invoke the downstream service.
Expand Down
7 changes: 7 additions & 0 deletions pixiu/pkg/client/dubbo/dubbo.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (

import (
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/client"
cst "github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/constant"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/config"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/logger"
)
Expand Down Expand Up @@ -314,6 +315,12 @@ func (dc *Client) create(key string, irequest fc.IntegrationRequest) *generic.Ge
refConf.Retries = irequest.DubboBackendConfig.Retries
}

if dc.dubboProxyConfig.Timeout != nil {
refConf.RequestTimeout = dc.dubboProxyConfig.Timeout.RequestTimeoutStr
} else {
refConf.RequestTimeout = cst.DefaultReqTimeout.String()
}
logger.Debugf("[dubbo-go-pixiu] client dubbo timeout val %v", refConf.RequestTimeout)
dc.lock.Lock()
defer dc.lock.Unlock()

Expand Down
3 changes: 1 addition & 2 deletions pixiu/pkg/client/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"net/url"
"strings"
"sync"
"time"
)

import (
Expand Down Expand Up @@ -107,7 +106,7 @@ func (dc *Client) Call(req *client.Request) (resp interface{}, err error) {

newReq, _ := http.NewRequest(req.IngressRequest.Method, targetURL, params.Body)
newReq.Header = params.Header
httpClient := &http.Client{Timeout: 5 * time.Second}
httpClient := &http.Client{Timeout: req.Timeout}

tr := otel.Tracer(traceNameHTTPClient)
_, span := tr.Start(req.Context, "HTTP "+newReq.Method, trace.WithSpanKind(trace.SpanKindClient))
Expand Down
9 changes: 5 additions & 4 deletions pixiu/pkg/client/mq/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ type (
}

KafkaProducerConfig struct {
Brokers []string `yaml:"brokers" json:"brokers"`
ProtocolVersion string `yaml:"protocol_version" json:"protocol_version"`
Metadata Metadata `yaml:"metadata" json:"metadata"`
Producer Producer `yaml:"producer" json:"producer"`
Brokers []string `yaml:"brokers" json:"brokers"`
ProtocolVersion string `yaml:"protocol_version" json:"protocol_version"`
Metadata Metadata `yaml:"metadata" json:"metadata"`
Producer Producer `yaml:"producer" json:"producer"`
Timeout time.Duration `yaml:"timeout" json:"timeout"`
}

Metadata struct {
Expand Down
1 change: 1 addition & 0 deletions pixiu/pkg/client/mq/kafka_facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ func NewKafkaProviderFacade(config KafkaProducerConfig) (*KafkaProducerFacade, e
c.Metadata.Retry.Max = config.Metadata.Retry.Max
c.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff
c.Producer.MaxMessageBytes = config.Producer.MaxMessageBytes
c.Producer.Timeout = config.Timeout
if config.ProtocolVersion != "" {
version, err := sarama.ParseKafkaVersion(config.ProtocolVersion)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion pixiu/pkg/client/mq/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func NewMQClient(config Config) (*Client, error) {
ctx := context.Background()
switch config.MqType {
case constant.MQTypeKafka:
config.KafkaProducerConfig.Timeout = config.Timeout
pf, err := NewKafkaProviderFacade(config.KafkaProducerConfig)
if err != nil {
return nil, err
Expand Down Expand Up @@ -125,7 +126,9 @@ func (c Client) Call(req *client.Request) (res interface{}, err error) {
consumerFacadeMap.Store(cReq.ConsumerGroup, facade)
if f, ok := consumerFacadeMap.Load(cReq.ConsumerGroup); ok {
cf := f.(ConsumerFacade)
err = cf.Subscribe(c.ctx, WithTopics(cReq.TopicList), WithConsumeUrl(cReq.ConsumeUrl), WithCheckUrl(cReq.CheckUrl), WithConsumerGroup(cReq.ConsumerGroup))
ctx, cancel := context.WithTimeout(c.ctx, req.Timeout)
defer cancel()
err = cf.Subscribe(ctx, WithTopics(cReq.TopicList), WithConsumeUrl(cReq.ConsumeUrl), WithCheckUrl(cReq.CheckUrl), WithConsumerGroup(cReq.ConsumerGroup))
if err != nil {
facade.Stop()
consumerFacadeMap.Delete(cReq.ConsumerGroup)
Expand Down
2 changes: 2 additions & 0 deletions pixiu/pkg/client/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package client
import (
"context"
"net/http"
"time"
)

import (
Expand All @@ -32,6 +33,7 @@ type Request struct {
Context context.Context
IngressRequest *http.Request
API router.API
Timeout time.Duration
}

// NewReq create a request
Expand Down
4 changes: 3 additions & 1 deletion pixiu/pkg/client/triple/triple.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ func (dc *Client) Call(req *client.Request) (res interface{}, err error) {
}
meta := make(map[string][]string)
reqData, _ := io.ReadAll(req.IngressRequest.Body)
call, err := p.Call(context.Background(), req.API.Method.IntegrationRequest.Interface, req.API.Method.IntegrationRequest.Method, reqData, (*proxymeta.Metadata)(&meta))
ctx, cancel := context.WithTimeout(context.Background(), req.Timeout)
defer cancel()
call, err := p.Call(ctx, req.API.Method.IntegrationRequest.Interface, req.API.Method.IntegrationRequest.Method, reqData, (*proxymeta.Metadata)(&meta))
if err != nil {
return "", errors.Errorf("call triple server error = %s", err)
}
Expand Down
6 changes: 6 additions & 0 deletions pixiu/pkg/common/constant/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package constant

import (
"time"
)

var (
Default403Body = []byte("403 for bidden")
Default404Body = []byte("404 page not found")
Expand All @@ -40,4 +44,6 @@ const (
LogDataBuffer = 5000
// console
Console = "console"

DefaultReqTimeout = 10 * time.Second
)
7 changes: 5 additions & 2 deletions pixiu/pkg/common/grpc/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,11 @@ func (gcm *GrpcConnectionManager) ServeHTTP(w stdHttp.ResponseWriter, r *stdHttp
gcm.writeStatus(w, status.New(codes.Unknown, "can't find endpoint in cluster"))
return
}

newReq := r.Clone(context.Background())
ctx := context.Background()
// timeout
ctx, cancel := context.WithTimeout(ctx, gcm.config.Timeout)
defer cancel()
newReq := r.Clone(ctx)
newReq.URL.Scheme = "http"
newReq.URL.Host = endpoint.Address.GetAddress()

Expand Down
2 changes: 1 addition & 1 deletion pixiu/pkg/common/http/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (hcm *HttpConnectionManager) ServeHTTP(w stdHttp.ResponseWriter, r *stdHttp
hc.Writer = w
hc.Request = r
hc.Reset()

hc.Timeout = hcm.config.Timeout
err := hcm.Handle(hc)
if err != nil {
logger.Errorf("ServeHTTP %v", err)
Expand Down
13 changes: 13 additions & 0 deletions pixiu/pkg/common/util/stringutil/stringutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
perrors "github.com/pkg/errors"
"net"
"strings"
"time"
)

import (
Expand Down Expand Up @@ -106,3 +107,15 @@ func GetIPAndPort(address string) ([]*net.TCPAddr, error) {

return tcpAddr, nil
}

func ResolveTimeStr2Time(currentV string, defaultV time.Duration) time.Duration {
if currentV == "" {
return defaultV
} else {
if duration, err := time.ParseDuration(currentV); err != nil {
return defaultV
} else {
return duration
}
}
}
4 changes: 3 additions & 1 deletion pixiu/pkg/context/http/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,9 @@ func (hc *HttpContext) LocalReply() bool {

// API sets the API to http context
func (hc *HttpContext) API(api router.API) {
hc.Timeout = api.Timeout
if hc.Timeout > api.Timeout {
hc.Timeout = api.Timeout
}
hc.Api = &api
}

Expand Down
2 changes: 2 additions & 0 deletions pixiu/pkg/filter/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,10 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *http.HttpContext, chain fi
}

func (f *Filter) Decode(ctx *http.HttpContext) filter.FilterStatus {
f.cfg.Timeout = ctx.Timeout
mqClient := mq.NewSingletonMQClient(*f.cfg)
req := client.NewReq(ctx.Request.Context(), ctx.Request, *ctx.GetAPI())
req.Timeout = ctx.Timeout
resp, err := mqClient.Call(req)
if err != nil {
logger.Errorf("[dubbo-go-pixiu] event client call err:%v!", err)
Expand Down
16 changes: 10 additions & 6 deletions pixiu/pkg/filter/http/grpcproxy/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
stdHttp "net/http"
"strings"
"sync"
"time"
)

import (
Expand Down Expand Up @@ -110,7 +111,8 @@ type (
Config struct {
DescriptorSourceStrategy DescriptorSourceStrategy `yaml:"descriptor_source_strategy" json:"descriptor_source_strategy" default:"auto"`
Path string `yaml:"path" json:"path"`
Rules []*Rule `yaml:"rules" json:"rules"` //nolint
Rules []*Rule `yaml:"rules" json:"rules"` //nolint
Timeout time.Duration `yaml:"timeout" json:"timeout"` //nolint
}

Rule struct {
Expand Down Expand Up @@ -190,7 +192,9 @@ func (f *Filter) Decode(c *http.HttpContext) filter.FilterStatus {
c.SendLocalReply(stdHttp.StatusServiceUnavailable, []byte("cluster not exists"))
return filter.Stop
}

// timeout for Dial and Invoke
ctx, cancel := context.WithTimeout(c.Ctx, c.Timeout)
defer cancel()
ep := e.Address.GetAddress()

p, ok := f.pools[strings.Join([]string{re.Cluster, ep}, ".")]
Expand All @@ -201,7 +205,7 @@ func (f *Filter) Decode(c *http.HttpContext) filter.FilterStatus {
clientConn, ok = p.Get().(*grpc.ClientConn)
if !ok || clientConn == nil {
// TODO(Kenway): Support Credential and TLS
clientConn, err = grpc.DialContext(c.Ctx, ep, grpc.WithTransportCredentials(insecure.NewCredentials()))
clientConn, err = grpc.DialContext(ctx, ep, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil || clientConn == nil {
logger.Errorf("%s err {failed to connect to grpc service provider}", loggerHeader)
c.SendLocalReply(stdHttp.StatusServiceUnavailable, []byte((fmt.Sprintf("%s", err))))
Expand All @@ -210,14 +214,14 @@ func (f *Filter) Decode(c *http.HttpContext) filter.FilterStatus {
}

// get DescriptorSource, contain file and reflection
source, err := f.descriptor.getDescriptorSource(context.WithValue(c.Ctx, ct.ContextKey(GrpcClientConnKey), clientConn), f.cfg)
source, err := f.descriptor.getDescriptorSource(context.WithValue(ctx, ct.ContextKey(GrpcClientConnKey), clientConn), f.cfg)
if err != nil {
logger.Errorf("%s err %s : %s ", loggerHeader, "get desc source fail", err)
c.SendLocalReply(stdHttp.StatusInternalServerError, []byte("service not config proto file or the server not support reflection API"))
return filter.Stop
}
//put DescriptorSource concurrent, del if no need
c.Ctx = context.WithValue(c.Ctx, ct.ContextKey(DescriptorSourceKey), source)
ctx = context.WithValue(ctx, ct.ContextKey(DescriptorSourceKey), source)

dscp, err := source.FindSymbol(svc)
if err != nil {
Expand Down Expand Up @@ -256,7 +260,7 @@ func (f *Filter) Decode(c *http.HttpContext) filter.FilterStatus {

// metadata in grpc has the same feature in http
md := mapHeaderToMetadata(c.AllHeaders())
ctx := metadata.NewOutgoingContext(c.Ctx, md)
ctx = metadata.NewOutgoingContext(ctx, md)

md = metadata.MD{}
t := metadata.MD{}
Expand Down
7 changes: 6 additions & 1 deletion pixiu/pkg/filter/http/httpproxy/routerfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,12 @@ func (f *Filter) Decode(hc *http.HttpContext) filter.FilterStatus {
}
req.Header = r.Header

resp, err := (&http3.Client{Transport: f.transport}).Do(req)
cli := &http3.Client{
Transport: f.transport,
Timeout: hc.Timeout,
}

resp, err := cli.Do(req)
if err != nil {
panic(err)
}
Expand Down
12 changes: 6 additions & 6 deletions pixiu/pkg/filter/http/remote/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ import (
)

const (
open = iota
close
all
OPEN = iota
CLOSE
ALL
)

const (
Expand Down Expand Up @@ -99,7 +99,7 @@ func (factory *FilterFactory) Apply() error {
}
level := mockLevel(mock)
if level < 0 || level > 2 {
level = close
level = CLOSE
}
factory.conf.Level = level
// must init it at apply function
Expand All @@ -115,7 +115,6 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *contexthttp.HttpContext, c
}

func (f *Filter) Decode(c *contexthttp.HttpContext) filter.FilterStatus {

if f.conf.Dpc.AutoResolve {
if err := f.resolve(c); err != nil {
c.SendLocalReply(http.StatusInternalServerError, []byte(fmt.Sprintf("auto resolve err: %s", err)))
Expand All @@ -125,7 +124,7 @@ func (f *Filter) Decode(c *contexthttp.HttpContext) filter.FilterStatus {

api := c.GetAPI()

if (f.conf.Level == open && api.Mock) || (f.conf.Level == all) {
if (f.conf.Level == OPEN && api.Mock) || (f.conf.Level == ALL) {
c.SourceResp = &contexthttp.ErrResponse{
Message: "mock success",
}
Expand All @@ -140,6 +139,7 @@ func (f *Filter) Decode(c *contexthttp.HttpContext) filter.FilterStatus {
}

req := client.NewReq(c.Request.Context(), c.Request, *api)
req.Timeout = c.Timeout
resp, err := cli.Call(req)
if err != nil {
logger.Errorf("[dubbo-go-pixiu] client call err:%v!", err)
Expand Down
2 changes: 2 additions & 0 deletions pixiu/pkg/filter/network/dubboproxy/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package dubboproxy
import (
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/constant"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/extension/filter"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/util/stringutil"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/model"
)

Expand All @@ -44,6 +45,7 @@ func (p *Plugin) Kind() string {
// CreateFilter create dubbo networkfilter
func (p *Plugin) CreateFilter(config interface{}) (filter.NetworkFilter, error) {
hcmc, ok := config.(*model.DubboProxyConnectionManagerConfig)
hcmc.Timeout = stringutil.ResolveTimeStr2Time(hcmc.TimeoutStr, constant.DefaultReqTimeout)
if !ok {
panic("CreateFilter occur some exception for the type is not suitable one.")
}
Expand Down
2 changes: 2 additions & 0 deletions pixiu/pkg/filter/network/grpcconnectionmanager/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/constant"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/extension/filter"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/grpc"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/util/stringutil"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/model"
)

Expand All @@ -44,6 +45,7 @@ func (p *Plugin) Kind() string {
// CreateFilter create grpc network filter
func (p *Plugin) CreateFilter(config interface{}) (filter.NetworkFilter, error) {
hcmc := config.(*model.GRPCConnectionManagerConfig)
hcmc.Timeout = stringutil.ResolveTimeStr2Time(hcmc.TimeoutStr, constant.DefaultReqTimeout)
return grpc.CreateGrpcConnectionManager(hcmc), nil
}

Expand Down
2 changes: 2 additions & 0 deletions pixiu/pkg/filter/network/httpconnectionmanager/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/constant"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/extension/filter"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/http"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/util/stringutil"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/model"
)

Expand All @@ -44,6 +45,7 @@ func (p *Plugin) Kind() string {
// CreateFilter create http network filter
func (p *Plugin) CreateFilter(config interface{}) (filter.NetworkFilter, error) {
hcmc := config.(*model.HttpConnectionManagerConfig)
hcmc.Timeout = stringutil.ResolveTimeStr2Time(hcmc.TimeoutStr, constant.DefaultReqTimeout)
return http.CreateHttpConnectionManager(hcmc), nil
}

Expand Down
Loading