From 28341d171dd4c1a52f46371ddfb5fd2240b79731 Mon Sep 17 00:00:00 2001 From: Michal Witkowski Date: Fri, 23 Oct 2015 08:30:24 +0100 Subject: [PATCH] move out forward logic to method, allowing for use as `grpc.Server` not found handler. --- proxy.go | 91 ++++++++++++++++++++++++++++++-------------------------- 1 file changed, 48 insertions(+), 43 deletions(-) diff --git a/proxy.go b/proxy.go index d20f4a9..f282498 100644 --- a/proxy.go +++ b/proxy.go @@ -13,44 +13,44 @@ // limitations under the License. package proxy + import ( - "net" "fmt" "io" + "net" "strings" "sync" + "golang.org/x/net/context" "google.golang.org/grpc" - "google.golang.org/grpc/transport" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" - "google.golang.org/grpc/codes" - "golang.org/x/net/context" + "google.golang.org/grpc/transport" ) - // transportWriter is a common interface between gRPC transport.ServerTransport and transport.ClientTransport. type transportWriter interface { Write(s *transport.Stream, data []byte, opts *transport.Options) error } type Proxy struct { - mu sync.Mutex - lis map[net.Listener]bool - conns map[transport.ServerTransport]bool - logger grpclog.Logger + mu sync.Mutex + lis map[net.Listener]bool + conns map[transport.ServerTransport]bool + logger grpclog.Logger director StreamDirector - opts *options + opts *options } // NewServer creates a gRPC proxy which will use the `StreamDirector` for making routing decisions. func NewServer(director StreamDirector, opt ...ProxyOption) *Proxy { s := &Proxy{ - lis: make(map[net.Listener]bool), - conns: make(map[transport.ServerTransport]bool), - opts: &options{}, + lis: make(map[net.Listener]bool), + conns: make(map[transport.ServerTransport]bool), + opts: &options{}, director: director, - logger: &defaultLogger{}, + logger: &defaultLogger{}, } for _, o := range opt { o(s.opts) @@ -138,17 +138,39 @@ func (s *Proxy) handleStream(frontTrans transport.ServerTransport, frontStream * } return } + ProxyStream(s.director, s.logger, frontTrans, frontStream) + +} - backendTrans, backendStream, err := s.backendTransportStream(frontStream.Context()) +// Stop stops the gRPC server. Once Stop returns, the server stops accepting +// connection requests and closes all the connected connections. +func (s *Proxy) Stop() { + s.mu.Lock() + listeners := s.lis + s.lis = nil + cs := s.conns + s.conns = nil + s.mu.Unlock() + for lis := range listeners { + lis.Close() + } + for c := range cs { + c.Close() + } +} + +// ProxyStream performs a forward of a gRPC frontend stream to a backend. +func ProxyStream(director StreamDirector, logger grpclog.Logger, frontTrans transport.ServerTransport, frontStream *transport.Stream) { + backendTrans, backendStream, err := backendTransportStream(director, frontStream.Context()) if err != nil { frontTrans.WriteStatus(frontStream, grpc.Code(err), grpc.ErrorDesc(err)) - s.logger.Printf("proxy: Proxy.handleStream %v failed to allocate backend: %v", frontStream.Method(), err) + logger.Printf("proxy: Proxy.handleStream %v failed to allocate backend: %v", frontStream.Method(), err) return } defer backendTrans.CloseStream(backendStream, nil) // data coming from client call to backend - ingressPathChan := s.forwardDataFrames(frontStream, backendStream, backendTrans) + ingressPathChan := forwardDataFrames(frontStream, backendStream, backendTrans) // custom header handling *must* be after some data is processed by the backend, otherwise there's a deadlock headerMd, err := backendStream.Header() @@ -156,13 +178,13 @@ func (s *Proxy) handleStream(frontTrans transport.ServerTransport, frontStream * frontTrans.WriteHeader(frontStream, headerMd) } // data coming from backend back to client call - egressPathChan := s.forwardDataFrames(backendStream, frontStream, frontTrans) + egressPathChan := forwardDataFrames(backendStream, frontStream, frontTrans) // wait for both data streams to complete. - egressErr := <- egressPathChan - ingressErr := <- ingressPathChan + egressErr := <-egressPathChan + ingressErr := <-ingressPathChan if egressErr != nil || ingressErr != nil { - s.logger.Printf("proxy: Proxy.handleStream %v failure during transfer ingres: %v egress: %v", frontStream.Method(), ingressErr, egressErr) + logger.Printf("proxy: Proxy.handleStream %v failure during transfer ingres: %v egress: %v", frontStream.Method(), ingressErr, egressErr) frontTrans.WriteStatus(frontStream, codes.Unavailable, fmt.Sprintf("problem in transfer ingress: %v egress: %v", ingressErr, egressErr)) return } @@ -175,8 +197,8 @@ func (s *Proxy) handleStream(frontTrans transport.ServerTransport, frontStream * } // backendTransportStream picks and establishes a Stream to the backend. -func (s *Proxy) backendTransportStream(ctx context.Context) (transport.ClientTransport, *transport.Stream, error) { - grpcConn, err := s.director(ctx) +func backendTransportStream(director StreamDirector, ctx context.Context) (transport.ClientTransport, *transport.Stream, error) { + grpcConn, err := director(ctx) if err != nil { if grpc.Code(err) != codes.Unknown { // rpcError check return nil, nil, err @@ -189,7 +211,7 @@ func (s *Proxy) backendTransportStream(ctx context.Context) (transport.ClientTra frontendStream, _ := transport.StreamFromContext(ctx) callHdr := &transport.CallHdr{ Method: frontendStream.Method(), - Host: "TODOFIXTLS", // TODO(michal): This can fail if the backend server is using TLS Hostname verification. Use conn.authority, once it's public? + Host: "TODOFIXTLS", // TODO(michal): This can fail if the backend server is using TLS Hostname verification. Use conn.authority, once it's public? } backendStream, err := backendTrans.NewStream(ctx, callHdr) if err != nil { @@ -200,10 +222,10 @@ func (s *Proxy) backendTransportStream(ctx context.Context) (transport.ClientTra // forwardDataFrames moves data from one gRPC transport `Stream` to another in async fashion. // It returns an error channel. `nil` on it signifies everything was fine, anything else is a serious problem. -func (s *Proxy) forwardDataFrames(srcStream *transport.Stream, dstStream *transport.Stream, dstTransport transportWriter) chan error { +func forwardDataFrames(srcStream *transport.Stream, dstStream *transport.Stream, dstTransport transportWriter) chan error { ret := make(chan error) - go func () { + go func() { data := make([]byte, 4096) opt := &transport.Options{} for { @@ -227,22 +249,5 @@ func (s *Proxy) forwardDataFrames(srcStream *transport.Stream, dstStream *transp close(ret) }() return ret - } -// Stop stops the gRPC server. Once Stop returns, the server stops accepting -// connection requests and closes all the connected connections. -func (s *Proxy) Stop() { - s.mu.Lock() - listeners := s.lis - s.lis = nil - cs := s.conns - s.conns = nil - s.mu.Unlock() - for lis := range listeners { - lis.Close() - } - for c := range cs { - c.Close() - } -} \ No newline at end of file