Skip to content

Commit ee1a7ea

Browse files
committed
THRIFT-5731: Handle ErrAbandonRequest automatically
Also add a test to verify the behavior. The test helped me to found a bug in TSimpleServer that didn't handle the ErrAbandonRequest case correctly, so fix the bug as well. client: go
1 parent 4ca47e0 commit ee1a7ea

File tree

4 files changed

+119
-9
lines changed

4 files changed

+119
-9
lines changed

compiler/cpp/src/thrift/generate/t_go_generator.cc

+14-5
Original file line numberDiff line numberDiff line change
@@ -2797,13 +2797,13 @@ void t_go_generator::generate_process_function(t_service* tservice, t_function*
27972797
f_types_ << indent() << "if thrift.ServerConnectivityCheckInterval > 0 {" << endl;
27982798

27992799
indent_up();
2800-
f_types_ << indent() << "var cancel context.CancelFunc" << endl;
2801-
f_types_ << indent() << "ctx, cancel = context.WithCancel(ctx)" << endl;
2802-
f_types_ << indent() << "defer cancel()" << endl;
2800+
f_types_ << indent() << "var cancel context.CancelCauseFunc" << endl;
2801+
f_types_ << indent() << "ctx, cancel = context.WithCancelCause(ctx)" << endl;
2802+
f_types_ << indent() << "defer cancel(nil)" << endl;
28032803
f_types_ << indent() << "var tickerCtx context.Context" << endl;
28042804
f_types_ << indent() << "tickerCtx, tickerCancel = context.WithCancel(context.Background())" << endl;
28052805
f_types_ << indent() << "defer tickerCancel()" << endl;
2806-
f_types_ << indent() << "go func(ctx context.Context, cancel context.CancelFunc) {" << endl;
2806+
f_types_ << indent() << "go func(ctx context.Context, cancel context.CancelCauseFunc) {" << endl;
28072807

28082808
indent_up();
28092809
f_types_ << indent() << "ticker := time.NewTicker(thrift.ServerConnectivityCheckInterval)" << endl;
@@ -2821,7 +2821,7 @@ void t_go_generator::generate_process_function(t_service* tservice, t_function*
28212821
indent_up();
28222822
f_types_ << indent() << "if !iprot.Transport().IsOpen() {" << endl;
28232823
indent_up();
2824-
f_types_ << indent() << "cancel()" << endl;
2824+
f_types_ << indent() << "cancel(thrift.ErrAbandonRequest)" << endl;
28252825
f_types_ << indent() << "return" << endl;
28262826
indent_down();
28272827
f_types_ << indent() << "}" << endl;
@@ -2901,6 +2901,15 @@ void t_go_generator::generate_process_function(t_service* tservice, t_function*
29012901
f_types_ << indent() << "return false, thrift.WrapTException(err2)" << endl;
29022902
indent_down();
29032903
f_types_ << indent() << "}" << endl;
2904+
f_types_ << indent() << "if errors.Is(err2, context.Canceled) {" << endl;
2905+
indent_up();
2906+
f_types_ << indent() << "if err := context.Cause(ctx); errors.Is(err, thrift.ErrAbandonRequest) {" << endl;
2907+
indent_up();
2908+
f_types_ << indent() << "return false, thrift.WrapTException(err)" << endl;
2909+
indent_down();
2910+
f_types_ << indent() << "}" << endl;
2911+
indent_down();
2912+
f_types_ << indent() << "}" << endl;
29042913

29052914
string exc(tmp("_exc"));
29062915
f_types_ << indent() << exc << " := thrift.NewTApplicationException(thrift.INTERNAL_ERROR, "

lib/go/README.md

+9-3
Original file line numberDiff line numberDiff line change
@@ -108,13 +108,19 @@ The context object passed into the server handler function will be canceled when
108108
the client closes the connection (this is a best effort check, not a guarantee
109109
-- there's no guarantee that the context object is always canceled when client
110110
closes the connection, but when it's canceled you can always assume the client
111-
closed the connection). When implementing Go Thrift server, you can take
112-
advantage of that to abandon requests that's no longer needed:
111+
closed the connection). The cause of the cancellation (via `context.Cause(ctx)`)
112+
would also be set to `thrift.ErrAbandonRequest`.
113+
114+
When implementing Go Thrift server, you can take advantage of that to abandon
115+
requests that's no longer needed by returning `thrift.ErrAbandonRequest`:
113116

114117
func MyEndpoint(ctx context.Context, req *thriftRequestType) (*thriftResponseType, error) {
115118
...
116119
if ctx.Err() == context.Canceled {
117120
return nil, thrift.ErrAbandonRequest
121+
// Or just return ctx.Err(), compiler generated processor code will
122+
// handle it for you automatically:
123+
// return nil, ctx.Err()
118124
}
119125
...
120126
}
@@ -155,4 +161,4 @@ will wait for all the client connections to be closed gracefully with
155161
zero err time. Otherwise, the stop will wait for all the client
156162
connections to be closed gracefully util thrift.ServerStopTimeout is
157163
reached, and client connections that are not closed after thrift.ServerStopTimeout
158-
will be closed abruptly which may cause some client errors.
164+
will be closed abruptly which may cause some client errors.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package tests
21+
22+
import (
23+
"context"
24+
"runtime/debug"
25+
"testing"
26+
"time"
27+
28+
"github.com/apache/thrift/lib/go/test/gopath/src/clientmiddlewareexceptiontest"
29+
"github.com/apache/thrift/lib/go/thrift"
30+
)
31+
32+
func TestServerConnectivityCheck(t *testing.T) {
33+
const (
34+
// Server will sleep for longer than client is willing to wait
35+
// so client will close the connection.
36+
serverSleep = 50 * time.Millisecond
37+
clientSocketTimeout = time.Millisecond
38+
)
39+
serverSocket, err := thrift.NewTServerSocket(":0")
40+
if err != nil {
41+
t.Fatalf("failed to create server socket: %v", err)
42+
}
43+
processor := clientmiddlewareexceptiontest.NewClientMiddlewareExceptionTestProcessor(fakeClientMiddlewareExceptionTestHandler(
44+
func(ctx context.Context) (*clientmiddlewareexceptiontest.FooResponse, error) {
45+
time.Sleep(serverSleep)
46+
err := ctx.Err()
47+
if err == nil {
48+
t.Error("Expected server ctx to be cancelled, did not happen")
49+
return new(clientmiddlewareexceptiontest.FooResponse), nil
50+
}
51+
return nil, err
52+
},
53+
))
54+
server := thrift.NewTSimpleServer2(processor, serverSocket)
55+
if err := server.Listen(); err != nil {
56+
t.Fatalf("failed to listen server: %v", err)
57+
}
58+
server.SetLogger(func(msg string) {
59+
t.Errorf("Server logger called with %q", msg)
60+
t.Errorf("Server logger callstack:\n%s", debug.Stack())
61+
})
62+
addr := serverSocket.Addr().String()
63+
go server.Serve()
64+
t.Cleanup(func() {
65+
server.Stop()
66+
})
67+
68+
cfg := &thrift.TConfiguration{
69+
SocketTimeout: clientSocketTimeout,
70+
}
71+
socket := thrift.NewTSocketConf(addr, cfg)
72+
if err := socket.Open(); err != nil {
73+
t.Fatalf("failed to create client connection: %v", err)
74+
}
75+
t.Cleanup(func() {
76+
socket.Close()
77+
})
78+
inProtocol := thrift.NewTBinaryProtocolConf(socket, cfg)
79+
outProtocol := thrift.NewTBinaryProtocolConf(socket, cfg)
80+
client := thrift.NewTStandardClient(inProtocol, outProtocol)
81+
ctx, cancel := context.WithTimeout(context.Background(), clientSocketTimeout)
82+
defer cancel()
83+
_, err = clientmiddlewareexceptiontest.NewClientMiddlewareExceptionTestClient(client).Foo(ctx)
84+
socket.Close()
85+
if err == nil {
86+
t.Error("Expected client to time out, did not happen")
87+
}
88+
}

lib/go/thrift/simple_server.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"errors"
2525
"fmt"
2626
"io"
27+
"net"
2728
"sync"
2829
"sync/atomic"
2930
"time"
@@ -354,7 +355,13 @@ func (p *TSimpleServer) processRequests(client TTransport) (err error) {
354355

355356
ok, err := processor.Process(ctx, inputProtocol, outputProtocol)
356357
if errors.Is(err, ErrAbandonRequest) {
357-
return client.Close()
358+
err := client.Close()
359+
if errors.Is(err, net.ErrClosed) {
360+
// In this case, it's kinda expected to get
361+
// net.ErrClosed, treat that as no-error
362+
return nil
363+
}
364+
return err
358365
}
359366
if errors.As(err, new(TTransportException)) && err != nil {
360367
return err

0 commit comments

Comments
 (0)