From dd2fe3350458d03f36e74c2c55af95f1baf25565 Mon Sep 17 00:00:00 2001 From: Edward McFarlane Date: Fri, 8 Nov 2024 18:26:42 -0500 Subject: [PATCH 1/3] Add support for gRPC-Web text encoding This adds support for gRPC-Web's text encoding. Only client support is added. Encoding of request and response body is handled to support text streams. Signed-off-by: Edward McFarlane --- protocol.go | 7 ++ protocol_grpc.go | 206 ++++++++++++++++++++++++++++++++++++++- protocol_grpc_test.go | 75 ++++++++++++++ protocol_rest.go | 3 + transcoder.go | 6 +- vanguard_rpcxrpc_test.go | 159 ++++++++++++++++++++++++++++++ 6 files changed, 452 insertions(+), 4 deletions(-) diff --git a/protocol.go b/protocol.go index ec82441..cdea795 100644 --- a/protocol.go +++ b/protocol.go @@ -46,6 +46,11 @@ const ( // streaming endpoints. However, bidirectional streams are only supported // when combined with HTTP/2. ProtocolGRPCWeb + // ProtocolGRPCWebText is a variant of ProtocolGRPCWeb that uses base64 + // text encoding for the request and response bodies. + // + // This protocol is not supported on the server side. + ProtocolGRPCWebText // ProtocolREST indicates the REST+JSON protocol. This protocol often // requires non-trivial transformations between HTTP requests and responses // and Protobuf request and response messages. @@ -96,6 +101,8 @@ func (p Protocol) serverHandler(op *operation) serverProtocolHandler { return grpcServerProtocol{} case ProtocolGRPCWeb: return grpcWebServerProtocol{} + case ProtocolGRPCWebText: + return nil // gRPC-Web Text is not supported on the server. case ProtocolREST: return restServerProtocol{} default: diff --git a/protocol_grpc.go b/protocol_grpc.go index 7cea306..1b9c9c3 100644 --- a/protocol_grpc.go +++ b/protocol_grpc.go @@ -16,6 +16,7 @@ package vanguard import ( "bytes" + "encoding/base64" "encoding/binary" "errors" "fmt" @@ -151,7 +152,7 @@ func (g grpcServerProtocol) String() string { return g.protocol().String() } -// grpcClientProtocol implements the gRPC protocol for +// grpcWebClientProtocol implements the gRPC-Web protocol for // processing RPCs received from the client. type grpcWebClientProtocol struct{} @@ -212,7 +213,7 @@ func (g grpcWebClientProtocol) String() string { return g.protocol().String() } -// grpcServerProtocol implements the gRPC-Web protocol for +// grpcWebServerProtocol implements the gRPC-Web protocol for // sending RPCs to the server handler. type grpcWebServerProtocol struct{} @@ -278,6 +279,73 @@ func (g grpcWebServerProtocol) String() string { return g.protocol().String() } +// grpcWebTextClientProtocol implements the gRPC-Web protocol for +// processing RPCs received from the client. +type grpcWebTextClientProtocol struct{} + +var _ clientProtocolHandler = grpcWebTextClientProtocol{} +var _ clientBodyPreparer = grpcWebTextClientProtocol{} +var _ envelopedProtocolHandler = grpcWebTextClientProtocol{} + +func (g grpcWebTextClientProtocol) protocol() Protocol { + return ProtocolGRPCWebText +} + +func (g grpcWebTextClientProtocol) acceptsStreamType(_ *operation, _ connect.StreamType) bool { + return true +} + +func (g grpcWebTextClientProtocol) requestNeedsPrep(op *operation) bool { + // Hijack the request and response body to handle base64 encoding/decoding. + op.request.Body = struct { + io.Reader + io.Closer + }{ + Reader: newGRPCWebTextReader(op.request.Body), + Closer: op.request.Body, + } + op.writer = newGRPCWebTextResponseWriter(op.writer) + return false +} + +func (g grpcWebTextClientProtocol) prepareUnmarshalledRequest(_ *operation, _ []byte, _ proto.Message) error { + // requestNeedsPrep always returns false. + return errors.New("gRPC-Web text prepareUnmarshalledRequest not implemented") +} + +func (g grpcWebTextClientProtocol) responseNeedsPrep(_ *operation) bool { + return false // Setup in requestNeedsPrep. +} + +func (g grpcWebTextClientProtocol) prepareMarshalledResponse(_ *operation, _ []byte, _ proto.Message, _ http.Header) ([]byte, error) { + // responseNeedsPrep always returns false. + return nil, errors.New("gRPC-Web text prepareMarshalledResponse not implemented") +} + +func (g grpcWebTextClientProtocol) extractProtocolRequestHeaders(_ *operation, headers http.Header) (requestMeta, error) { + return grpcExtractRequestMeta("application/grpc-web-text", "application/grpc-web-text+", headers) +} + +func (g grpcWebTextClientProtocol) addProtocolResponseHeaders(meta responseMeta, headers http.Header) int { + return grpcAddResponseMeta("application/grpc-web-text+", meta, headers) +} + +func (g grpcWebTextClientProtocol) encodeEnd(op *operation, end *responseEnd, writer io.Writer, wasInHeaders bool) http.Header { + return grpcWebClientProtocol{}.encodeEnd(op, end, writer, wasInHeaders) +} + +func (g grpcWebTextClientProtocol) decodeEnvelope(bytes envelopeBytes) (envelope, error) { + return grpcServerProtocol{}.decodeEnvelope(bytes) +} + +func (g grpcWebTextClientProtocol) encodeEnvelope(env envelope) envelopeBytes { + return grpcWebClientProtocol{}.encodeEnvelope(env) +} + +func (g grpcWebTextClientProtocol) String() string { + return g.protocol().String() +} + func grpcExtractRequestMeta(contentTypeShort, contentTypePrefix string, headers http.Header) (requestMeta, error) { var reqMeta requestMeta if err := grpcExtractTimeoutFromHeaders(headers, &reqMeta); err != nil { @@ -641,3 +709,137 @@ func grpcTimeoutUnitLookup(unit byte) time.Duration { return 0 } } + +// grpcWebTextResponseWriter wraps an http.ResponseWriter and base64-encodes +// the response body for grpc-web-text. +type grpcWebTextResponseWriter struct { + http.ResponseWriter + + encoder io.WriteCloser +} + +// newGRPCWebTextResponseWriter creates a new grpcWebTextResponseWriter. +func newGRPCWebTextResponseWriter(w http.ResponseWriter) *grpcWebTextResponseWriter { + return &grpcWebTextResponseWriter{ + ResponseWriter: w, + } +} + +func (w *grpcWebTextResponseWriter) Write(p []byte) (int, error) { + if w.encoder == nil { + w.encoder = base64.NewEncoder(base64.StdEncoding, w.ResponseWriter) + } + return w.encoder.Write(p) +} + +func (w *grpcWebTextResponseWriter) Flush() { + // Close the base64 encoder to flush any remaining data. This may be + // called multiple times as needed, padding is output on Close. + // See https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md + if w.encoder != nil { + _ = w.encoder.Close() + w.encoder = nil + } + if flusher, ok := w.ResponseWriter.(http.Flusher); ok { + flusher.Flush() + } +} + +// Unwrap returns the underlying http.ResponseWriter. +func (w grpcWebTextResponseWriter) Unwrap() http.ResponseWriter { + return w.ResponseWriter +} + +// grpcWebTextReader wraps an io.Reader and base64-decodes the response body +// for grpc-web-text. +type grpcWebTextReader struct { + delegate io.Reader + start, end int + inputBuffer [512]byte + outputBuffer [384]byte + output []byte +} + +// newGRPCWebTextReader creates a new grpcWebTextReader. +func newGRPCWebTextReader(r io.Reader) *grpcWebTextReader { + return &grpcWebTextReader{ + delegate: r, + } +} + +// Read reads base64-encoded data from the underlying reader and decodes it. +// The reader handles padding characters within the stream. It will ensure that +// padding is always at the end of a chunk of data when processing the chunk. +func (r *grpcWebTextReader) Read(dst []byte) (int, error) { + if len(dst) == 0 { + return 0, nil + } + if len(r.output) > 0 { + n := copy(dst, r.output) + r.output = r.output[n:] + return n, nil + } + // Read from the stream in 4-byte tokens. + for r.end-r.start < 4 { + size, err := r.readWithoutNewlines(r.inputBuffer[r.end:]) + if size == 0 { + if err == nil { + err = io.ErrNoProgress + } else if errors.Is(err, io.EOF) && r.end > r.start { + // Non 4-byte chunk at the end of the stream. + err = io.ErrUnexpectedEOF + } + return 0, err + } + r.end += size + } + // Decode the next chunk of data. + length := ((r.end - r.start) / 4) * 4 + dstLength := base64.StdEncoding.EncodedLen(len(r.outputBuffer)) + chunkLength := min(dstLength, length) + + // If we have padding, we split the stream at the padding and decode the + // chunk up to the padding. + if index := bytes.IndexRune(r.inputBuffer[r.start:r.end], base64.StdPadding); index != -1 { + chunkOffset := ((index + 4) / 4) * 4 + chunkLength = min(chunkLength, chunkOffset) + } + output := r.outputBuffer[:] + input := r.inputBuffer[r.start : r.start+chunkLength] + size, err := base64.StdEncoding.Decode(output, input) + if err != nil { + return 0, err + } + r.start += chunkLength + if r.start == r.end { + r.start, r.end = 0, 0 + } + r.output = output[:size] + size = copy(dst, r.output) + r.output = r.output[size:] + return size, err +} + +// readWithoutNewlines reads from the underlying reader, skipping over any +// newline characters in the buffer. This follows the behavior of +// base64.NewDecoder. +func (r *grpcWebTextReader) readWithoutNewlines(dst []byte) (n int, err error) { + n, err = r.delegate.Read(dst) + for n > 0 { + offset := 0 + for i, b := range dst[:n] { + if b != '\r' && b != '\n' { + if i != offset { + dst[offset] = b + } + offset++ + } + } + if offset > 0 { + return offset, err + } + // Previous buffer entirely whitespace, read again. + n, err = r.delegate.Read(dst) + } + return n, err +} diff --git a/protocol_grpc_test.go b/protocol_grpc_test.go index 8336e9b..6efa1af 100644 --- a/protocol_grpc_test.go +++ b/protocol_grpc_test.go @@ -17,8 +17,10 @@ package vanguard import ( "errors" "fmt" + "io" "math" "net/http/httptest" + "strings" "testing" "testing/quick" "time" @@ -167,3 +169,76 @@ func compareErrors(t *testing.T, got, want *connect.Error) { } } } + +func TestGRPCWebTextResponseWriter(t *testing.T) { + t.Parallel() + + rec := httptest.NewRecorder() + writer := newGRPCWebTextResponseWriter(rec) + writer.Header().Set("Content-Type", "application/grpc-web-text+proto") + _, err := writer.Write([]byte("Hello, 世界")) + require.NoError(t, err) + writer.Flush() + _, err = writer.Write([]byte("Hello, 世界")) + require.NoError(t, err) + writer.Flush() + + assert.Equal(t, "SGVsbG8sIOS4lueVjA==SGVsbG8sIOS4lueVjA==", rec.Body.String()) + assert.Equal(t, "application/grpc-web-text+proto", rec.Header().Get("Content-Type")) + + out, err := io.ReadAll(newGRPCWebTextReader(strings.NewReader(rec.Body.String()))) + require.NoError(t, err) + assert.Equal(t, "Hello, 世界Hello, 世界", string(out)) +} + +func TestGRPCWebTextReader(t *testing.T) { + t.Parallel() + for _, test := range []struct { + name, input, output string + }{ + {"hello", "SGVsbG8sIOS4lueVjA==", "Hello, 世界"}, + {"hello_duplicate", "SGVsbG8sIOS4lueVjA==SGVsbG8sIOS4lueVjA==", "Hello, 世界Hello, 世界"}, + {"some_data", "c29tZSBkYXRhIHdpdGggACBhbmQg77u/", "some data with \x00 and \ufeff"}, + {"ab", "QQ==Qg==", "AB"}, + {"a_b", "Q\nQ=\r=Qg=\r=", "AB"}, + { + "foobar", + "Zg==" + "Zm8=" + "Zm9v" + "Zm9vYg==" + "Zm9vYmE=" + "Zm9vYmFy", + "f" + "fo" + "foo" + "foob" + "fooba" + "foobar", + }, + { + "RFC3548", + "FPucA9l+" + "FPucA9k=" + "FPucAw==", + "\x14\xfb\x9c\x03\xd9\x7e" + "\x14\xfb\x9c\x03\xd9" + "\x14\xfb\x9c\x03", + }, + { + "wikipedia", + "c3VyZS4=" + "c3VyZQ==" + "c3Vy" + "c3U=" + "bGVhc3VyZS4=" + "ZWFzdXJlLg==" + "YXN1cmUu" + "c3VyZS4=", + "sure." + "sure" + "sur" + "su" + "leasure." + "easure." + "asure." + "sure.", + }, + } { + test := test + t.Run(test.name, func(t *testing.T) { + t.Parallel() + decoder := newGRPCWebTextReader(strings.NewReader(test.input)) + b, err := io.ReadAll(decoder) + require.NoError(t, err) + output := string(b) + assert.Equal(t, test.output, output) + }) + } + t.Run("partial_reads", func(t *testing.T) { + var buf [5]byte + decoder := newGRPCWebTextReader(strings.NewReader("SGVsbG8sIOS4lueVjA==SGVsbG8sIOS4lueVjA==")) + total := 0 + for { + n, err := decoder.Read(buf[:]) + if errors.Is(err, io.EOF) { + break + } + require.NoError(t, err) + total += n + } + assert.Equal(t, len("Hello, 世界Hello, 世界"), total) + }) +} diff --git a/protocol_rest.go b/protocol_rest.go index 110ec8c..f05425b 100644 --- a/protocol_rest.go +++ b/protocol_rest.go @@ -437,6 +437,9 @@ func restHTTPBodyRequest(op *operation) bool { } func restHTTPBodyResponse(op *operation) bool { + if op.restTarget == nil { + return false + } return restIsHTTPBody(op.methodConf.descriptor.Output(), op.restTarget.responseBodyFields) } diff --git a/transcoder.go b/transcoder.go index 500c66e..df12580 100644 --- a/transcoder.go +++ b/transcoder.go @@ -312,6 +312,8 @@ func classifyRequest(req *http.Request) (clientProtocolHandler, url.Values) { return grpcClientProtocol{}, nil case contentType == "application/grpc-web" || strings.HasPrefix(contentType, "application/grpc-web+"): return grpcWebClientProtocol{}, nil + case contentType == "application/grpc-web-text" || strings.HasPrefix(contentType, "application/grpc-web-text+"): + return grpcWebTextClientProtocol{}, nil case strings.HasPrefix(contentType, "application/"): connectVersion := req.Header["Connect-Protocol-Version"] if len(connectVersion) == 1 && connectVersion[0] == "1" { @@ -2177,10 +2179,10 @@ func asFlusher(respWriter http.ResponseWriter) http.Flusher { // we can't use that since it isn't available prior to Go 1.21. for { switch typedWriter := respWriter.(type) { - case http.Flusher: - return typedWriter case errorFlusher: return flusherNoError{f: typedWriter} + case http.Flusher: + return typedWriter case interface{ Unwrap() http.ResponseWriter }: respWriter = typedWriter.Unwrap() default: diff --git a/vanguard_rpcxrpc_test.go b/vanguard_rpcxrpc_test.go index 5b8df24..7fb525f 100644 --- a/vanguard_rpcxrpc_test.go +++ b/vanguard_rpcxrpc_test.go @@ -16,7 +16,10 @@ package vanguard import ( "context" + "encoding/base64" + "encoding/binary" "fmt" + "io" "net/http" "net/http/httptest" "net/http/httputil" @@ -416,3 +419,159 @@ func TestMux_RPCxRPC(t *testing.T) { }) } } + +func Test_grpcWebText(t *testing.T) { + t.Parallel() + + var interceptor testInterceptor + serveMux := http.NewServeMux() + serveMux.Handle(testv1connect.NewContentServiceHandler( + testv1connect.UnimplementedContentServiceHandler{}, + connect.WithInterceptors(&interceptor), + )) + + svcHandler := protocolAssertMiddleware( + ProtocolConnect, CodecProto, CompressionIdentity, serveMux, + ) + services := []*Service{ + NewService( + testv1connect.ContentServiceName, + svcHandler, + WithNoTargetCompression(), + ), + } + handler, err := NewTranscoder(services) + require.NoError(t, err) + server := httptest.NewUnstartedServer(handler) + server.EnableHTTP2 = true + server.StartTLS() + disableCompression(server) + t.Cleanup(server.Close) + + grpcWebClient := grpcWebTextHijackClient{client: server.Client()} + client := testv1connect.NewContentServiceClient( + grpcWebClient, server.URL, connect.WithGRPCWeb(), + connect.WithAcceptCompression("gzip", nil, nil), + ) + + ctx := context.Background() + invoke := func( + client testv1connect.ContentServiceClient, + header http.Header, + msgs []proto.Message, + ) (http.Header, []proto.Message, http.Header, error) { + return outputFromBidiStream(ctx, client.Subscribe, header, msgs) + } + stream := testStream{ + method: testv1connect.ContentServiceSubscribeProcedure, + reqHeader: http.Header{"Message": []string{"hello"}}, + rspHeader: http.Header{"Message": []string{"world"}}, + msgs: []testMsg{ + {in: &testMsgIn{ + msg: &testv1.SubscribeRequest{FilenamePatterns: []string{"xyz.*", "abc*.jpg"}}, + }}, + {out: &testMsgOut{ + msg: &testv1.SubscribeResponse{ + FilenameChanged: "xyz1.foo", + }, + }}, + {out: &testMsgOut{ + msg: &testv1.SubscribeResponse{ + FilenameChanged: "xyz2.foo", + }, + }}, + {in: &testMsgIn{ + msg: &testv1.SubscribeRequest{FilenamePatterns: []string{"test.test"}}, + }}, + {out: &testMsgOut{ + msg: &testv1.SubscribeResponse{ + FilenameChanged: "test.test", + }, + }}, + }, + rspTrailer: http.Header{"Trailer-Val": []string{"end"}}, + } + for i, v := range stream.msgs { + var msg proto.Message + if v.in != nil { + msg = v.in.msg + } else { + msg = v.out.msg + } + t.Log(i, proto.Size(msg)) + } + runRPCTestCase(t, &interceptor, client, invoke, stream) +} + +// grpcWebTextHijackClient converts a grpcWeb client to a grpcWebText client. +type grpcWebTextHijackClient struct { + client connect.HTTPClient +} + +func (c grpcWebTextHijackClient) Do(req *http.Request) (*http.Response, error) { + if req.Method != http.MethodPost { + return nil, fmt.Errorf("unexpected method: %s", req.Method) + } + if contentType := req.Header.Get("Content-Type"); contentType != "application/grpc-web+proto" { + return nil, fmt.Errorf("unexpected content type: %s", contentType) + } + req.Header.Set("Content-Type", "application/grpc-web-text+proto") + req.Header.Del("Content-Length") + req.Header.Del("Grpc-Accept-Encoding") + requestBody := req.Body + pipeReader, pipeWriter := io.Pipe() + go func() { + var err error + for { + var env [5]byte + if _, err = io.ReadFull(requestBody, env[:]); err != nil { + break + } + size := binary.BigEndian.Uint32(env[1:]) + buf := make([]byte, size) + if _, err = io.ReadFull(requestBody, buf); err != nil { + break + } + enc := base64.NewEncoder(base64.StdEncoding, pipeWriter) + if _, err = enc.Write(env[:]); err != nil { + break + } + if _, err := enc.Write(buf); err != nil { + break + } + if err = enc.Close(); err != nil { + break + } + } + pipeWriter.CloseWithError(err) + }() + req.Body = struct { + io.Reader + io.Closer + }{ + Reader: pipeReader, + Closer: requestBody, + } + + rsp, err := c.client.Do(req) + if err != nil { + return nil, err + } + + contentType := rsp.Header.Get("Content-Type") + if !strings.HasPrefix(contentType, "application/grpc-web-text+proto") { + return rsp, nil // Unknown content type, return as is. + } + + rsp.Header.Set("Content-Type", "application/grpc-web+proto") + rsp.Header.Del("Content-Length") + rspBody := rsp.Body + rsp.Body = struct { + io.Reader + io.Closer + }{ + Reader: newGRPCWebTextReader(rspBody), + Closer: rspBody, + } + return rsp, nil +} From 6686432e0d24ef272b283bf3270c35c0f967aa3a Mon Sep 17 00:00:00 2001 From: Edward McFarlane Date: Mon, 11 Nov 2024 10:19:41 -0500 Subject: [PATCH 2/3] Reduce chunk seek Signed-off-by: Edward McFarlane --- protocol_grpc.go | 15 +++++++-------- vanguard_rpcxrpc_test.go | 9 --------- 2 files changed, 7 insertions(+), 17 deletions(-) diff --git a/protocol_grpc.go b/protocol_grpc.go index 1b9c9c3..dc9c1c0 100644 --- a/protocol_grpc.go +++ b/protocol_grpc.go @@ -775,9 +775,9 @@ func (r *grpcWebTextReader) Read(dst []byte) (int, error) { return 0, nil } if len(r.output) > 0 { - n := copy(dst, r.output) - r.output = r.output[n:] - return n, nil + size := copy(dst, r.output) + r.output = r.output[size:] + return size, nil } // Read from the stream in 4-byte tokens. for r.end-r.start < 4 { @@ -797,15 +797,14 @@ func (r *grpcWebTextReader) Read(dst []byte) (int, error) { length := ((r.end - r.start) / 4) * 4 dstLength := base64.StdEncoding.EncodedLen(len(r.outputBuffer)) chunkLength := min(dstLength, length) - + input := r.inputBuffer[r.start : r.start+chunkLength] // If we have padding, we split the stream at the padding and decode the // chunk up to the padding. - if index := bytes.IndexRune(r.inputBuffer[r.start:r.end], base64.StdPadding); index != -1 { - chunkOffset := ((index + 4) / 4) * 4 - chunkLength = min(chunkLength, chunkOffset) + if index := bytes.IndexRune(input, base64.StdPadding); index != -1 { + chunkLength = ((index + 4) / 4) * 4 + input = input[:chunkLength] } output := r.outputBuffer[:] - input := r.inputBuffer[r.start : r.start+chunkLength] size, err := base64.StdEncoding.Decode(output, input) if err != nil { return 0, err diff --git a/vanguard_rpcxrpc_test.go b/vanguard_rpcxrpc_test.go index 7fb525f..b081b86 100644 --- a/vanguard_rpcxrpc_test.go +++ b/vanguard_rpcxrpc_test.go @@ -491,15 +491,6 @@ func Test_grpcWebText(t *testing.T) { }, rspTrailer: http.Header{"Trailer-Val": []string{"end"}}, } - for i, v := range stream.msgs { - var msg proto.Message - if v.in != nil { - msg = v.in.msg - } else { - msg = v.out.msg - } - t.Log(i, proto.Size(msg)) - } runRPCTestCase(t, &interceptor, client, invoke, stream) } From 1daeebb2c335fe7174764203d3dc24a853471e5e Mon Sep 17 00:00:00 2001 From: Edward McFarlane Date: Mon, 25 Nov 2024 17:01:27 -0500 Subject: [PATCH 3/3] Ensure response writer newline delimits on flush Signed-off-by: Edward McFarlane --- protocol_grpc.go | 3 +++ protocol_grpc_test.go | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/protocol_grpc.go b/protocol_grpc.go index dc9c1c0..cc9e7d2 100644 --- a/protocol_grpc.go +++ b/protocol_grpc.go @@ -740,6 +740,9 @@ func (w *grpcWebTextResponseWriter) Flush() { _ = w.encoder.Close() w.encoder = nil } + // Some clients may expect a newline after each message. This does not + // affect the base64 encoding. + _, _ = w.ResponseWriter.Write([]byte{'\n'}) if flusher, ok := w.ResponseWriter.(http.Flusher); ok { flusher.Flush() } diff --git a/protocol_grpc_test.go b/protocol_grpc_test.go index 6efa1af..2d04e3e 100644 --- a/protocol_grpc_test.go +++ b/protocol_grpc_test.go @@ -183,7 +183,7 @@ func TestGRPCWebTextResponseWriter(t *testing.T) { require.NoError(t, err) writer.Flush() - assert.Equal(t, "SGVsbG8sIOS4lueVjA==SGVsbG8sIOS4lueVjA==", rec.Body.String()) + assert.Equal(t, "SGVsbG8sIOS4lueVjA==\nSGVsbG8sIOS4lueVjA==\n", rec.Body.String()) assert.Equal(t, "application/grpc-web-text+proto", rec.Header().Get("Content-Type")) out, err := io.ReadAll(newGRPCWebTextReader(strings.NewReader(rec.Body.String())))