Skip to content

Commit

Permalink
Fix: Graceful shutdown bugs(supplement apache#1254) (apache#1257)
Browse files Browse the repository at this point in the history
* supplementary fix apache#1254

remove unused comments

fix import cycle

append apache license header

fix gracefulShutdownFilter unittest bug

go fmt

fix gracefulShutdownConfig unittest bug

fix gracefulShutdownConfig unittest bug

go fmt

* improve formatting based on code style

* go fmt

* set RequestsFinished explicitly

* use mutex to protect variables of ShutdownConfig

* ftr: add config (apache#1258)

* recover gracefulShutdownFilter logic

* remove unused mutex

Co-authored-by: Laurence <[email protected]>
  • Loading branch information
2 people authored and mark4z committed Jul 5, 2021
1 parent 0d0bd99 commit 912e120
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 17 deletions.
24 changes: 24 additions & 0 deletions config/config_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package config

import (
"context"
"path/filepath"
"sort"
"sync"
Expand All @@ -41,7 +42,9 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/common/proxy/proxy_factory"
"dubbo.apache.org/dubbo-go/v3/config_center"
"dubbo.apache.org/dubbo-go/v3/filter"
"dubbo.apache.org/dubbo-go/v3/metadata/service"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/registry"
)

Expand Down Expand Up @@ -74,6 +77,13 @@ func TestConfigLoader(t *testing.T) {
}

func TestLoad(t *testing.T) {
extension.SetFilter(constant.CONSUMER_SHUTDOWN_FILTER, func() filter.Filter {
return &mockGracefulShutdownFilter{}
})
extension.SetFilter(constant.PROVIDER_SHUTDOWN_FILTER, func() filter.Filter {
return &mockGracefulShutdownFilter{}
})

doInitConsumer()
doInitProvider()

Expand Down Expand Up @@ -596,3 +606,17 @@ func ConvertURLArrToIntfArr(urls []*common.URL) []interface{} {
}
return res
}

type mockGracefulShutdownFilter struct{}

func (f *mockGracefulShutdownFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
panic("implement me")
}

func (f *mockGracefulShutdownFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
panic("implement me")
}

func (f *mockGracefulShutdownFilter) Set(name string, config interface{}) {
return
}
26 changes: 26 additions & 0 deletions config/config_setter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package config

const (
GracefulShutdownFilterShutdownConfig = "GracefulShutdownFilterShutdownConfig"
)

type Setter interface {
Set(name string, config interface{})
}
8 changes: 8 additions & 0 deletions config/graceful_shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ func GracefulShutdownInit() {

signal.Notify(signals, ShutdownSignals...)

// retrieve ShutdownConfig for gracefulShutdownFilter
if filter, ok := extension.GetFilter(constant.CONSUMER_SHUTDOWN_FILTER).(Setter); ok && GetConsumerConfig().ShutdownConfig != nil {
filter.Set(GracefulShutdownFilterShutdownConfig, GetConsumerConfig().ShutdownConfig)
}
if filter, ok := extension.GetFilter(constant.PROVIDER_SHUTDOWN_FILTER).(Setter); ok && GetProviderConfig().ShutdownConfig != nil {
filter.Set(GracefulShutdownFilterShutdownConfig, GetProviderConfig().ShutdownConfig)
}

go func() {
select {
case sig := <-signals:
Expand Down
9 changes: 9 additions & 0 deletions config/graceful_shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,17 @@ import (
import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/filter"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

func TestGracefulShutdownInit(t *testing.T) {
extension.SetFilter(constant.CONSUMER_SHUTDOWN_FILTER, func() filter.Filter {
return &mockGracefulShutdownFilter{}
})
extension.SetFilter(constant.PROVIDER_SHUTDOWN_FILTER, func() filter.Filter {
return &mockGracefulShutdownFilter{}
})
GracefulShutdownInit()
}

Expand All @@ -49,6 +56,8 @@ func TestBeforeShutdown(t *testing.T) {
}

// without configuration
consumerConfig = nil
providerConfig = nil
BeforeShutdown()

consumerConfig = &ConsumerConfig{
Expand Down
30 changes: 17 additions & 13 deletions filter/filter_impl/graceful_shutdown_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,13 @@ import (
)

func init() {
consumerFiler := &gracefulShutdownFilter{
shutdownConfig: config.GetConsumerConfig().ShutdownConfig,
}
providerFilter := &gracefulShutdownFilter{
shutdownConfig: config.GetProviderConfig().ShutdownConfig,
}

// `init()` is performed before config.Load(), so shutdownConfig will be retrieved after config was loaded.
extension.SetFilter(constant.CONSUMER_SHUTDOWN_FILTER, func() filter.Filter {
return consumerFiler
return &gracefulShutdownFilter{}
})

extension.SetFilter(constant.PROVIDER_SHUTDOWN_FILTER, func() filter.Filter {
return providerFilter
return &gracefulShutdownFilter{}
})
}

Expand All @@ -60,22 +54,32 @@ func (gf *gracefulShutdownFilter) Invoke(ctx context.Context, invoker protocol.I
return gf.getRejectHandler().RejectedExecution(invoker.GetURL(), invocation)
}
atomic.AddInt32(&gf.activeCount, 1)
if gf.shutdownConfig != nil && gf.activeCount > 0 {
gf.shutdownConfig.RequestsFinished = false
}
return invoker.Invoke(ctx, invocation)
}

// OnResponse reduces the number of active processes then return the process result
func (gf *gracefulShutdownFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
atomic.AddInt32(&gf.activeCount, -1)
// although this isn't thread safe, it won't be a problem if the gf.rejectNewRequest() is true.
if gf.shutdownConfig != nil && gf.activeCount <= 0 {
if gf.shutdownConfig != nil && gf.shutdownConfig.RejectRequest && gf.activeCount <= 0 {
gf.shutdownConfig.RequestsFinished = true
}
return result
}

func (gf *gracefulShutdownFilter) Set(name string, conf interface{}) {
switch name {
case config.GracefulShutdownFilterShutdownConfig:
if shutdownConfig, ok := conf.(*config.ShutdownConfig); !ok {
gf.shutdownConfig = shutdownConfig
return
}
logger.Warnf("the type of config for {%s} should be *config.ShutdownConfig", config.GracefulShutdownFilterShutdownConfig)
default:
// do nothing
}
}

func (gf *gracefulShutdownFilter) rejectNewRequest() bool {
if gf.shutdownConfig == nil {
return false
Expand Down
5 changes: 2 additions & 3 deletions filter/filter_impl/tps_limit_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,8 @@ func TestGenericFilterInvokeWithDefaultTpsLimiterNotAllow(t *testing.T) {
attch := make(map[string]interface{})

result := tpsFilter.Invoke(context.Background(),
protocol.NewBaseInvoker(

invokeUrl), invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch))
protocol.NewBaseInvoker(invokeUrl),
invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch))
assert.Nil(t, result.Error())
assert.Nil(t, result.Result())
}
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7
github.com/coreos/go-systemd/v22 v22.1.0 h1:kq/SbG2BCKLkDKkjQf5OWwKWUKj1lgs3lFI4PxnR5lg=
github.com/coreos/go-systemd/v22 v22.1.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk=
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
Expand Down

0 comments on commit 912e120

Please sign in to comment.