Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring of gRPC internals & tests and introducing the reflectMetadata #3343

Merged
merged 4 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 69 additions & 34 deletions js/modules/k6/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,13 @@ func buildTLSConfigFromMap(parentConfig *tls.Config, tlsConfigMap map[string]int
}

// Connect is a block dial to the gRPC server at the given address (host:port)
func (c *Client) Connect(addr string, params map[string]interface{}) (bool, error) {
func (c *Client) Connect(addr string, params goja.Value) (bool, error) {
state := c.vu.State()
if state == nil {
return false, common.NewInitContextError("connecting to a gRPC server in the init context is not supported")
}

p, err := c.parseConnectParams(params)
p, err := newConnectParams(c.vu.Runtime(), params)
if err != nil {
return false, fmt.Errorf("invalid grpc.connect() parameters: %w", err)
}
Expand Down Expand Up @@ -260,6 +260,9 @@ func (c *Client) Connect(addr string, params map[string]interface{}) (bool, erro
if !p.UseReflectionProtocol {
return true, nil
}

ctx = metadata.NewOutgoingContext(ctx, p.ReflectionMetadata)

fdset, err := c.conn.Reflect(ctx)
if err != nil {
return false, err
Expand Down Expand Up @@ -309,11 +312,6 @@ func (c *Client) Invoke(
return nil, fmt.Errorf("unable to serialise request object: %w", err)
}

md := metadata.New(nil)
for param, strval := range p.Metadata {
md.Append(param, strval)
}

ctx, cancel := context.WithTimeout(c.vu.Context(), p.Timeout)
defer cancel()

Expand All @@ -335,7 +333,7 @@ func (c *Client) Invoke(
TagsAndMeta: &p.TagsAndMeta,
}

return c.conn.Invoke(ctx, method, md, reqmsg)
return c.conn.Invoke(ctx, method, p.Metadata, reqmsg)
}

// Close will close the client gRPC connection
Expand Down Expand Up @@ -430,7 +428,7 @@ func (c *Client) convertToMethodInfo(fdset *descriptorpb.FileDescriptorSet) ([]M
}

type invokeParams struct {
Metadata map[string]string
Metadata metadata.MD
TagsAndMeta metrics.TagsAndMeta
Timeout time.Duration
}
Expand All @@ -439,6 +437,7 @@ func (c *Client) parseInvokeParams(paramsVal goja.Value) (*invokeParams, error)
result := &invokeParams{
Timeout: 1 * time.Minute,
TagsAndMeta: c.vu.State().Tags.GetCurrentValues(),
Metadata: metadata.New(nil),
}
if paramsVal == nil || goja.IsUndefined(paramsVal) || goja.IsNull(paramsVal) {
return result, nil
Expand All @@ -451,30 +450,12 @@ func (c *Client) parseInvokeParams(paramsVal goja.Value) (*invokeParams, error)
c.vu.State().Logger.Warn("The headers property is deprecated, replace it with the metadata property, please.")
fallthrough
case "metadata":
result.Metadata = make(map[string]string)
v := params.Get(k).Export()
rawHeaders, ok := v.(map[string]interface{})
if !ok {
return result, errors.New("metadata must be an object with key-value pairs")
}
for hk, kv := range rawHeaders {
// TODO(rogchap): Should we manage a string slice?
// The spec defines that Binary-valued keys end in -bin
// https://grpc.io/docs/what-is-grpc/core-concepts/#metadata
var strval string
if strings.HasSuffix(hk, "-bin") {
var binval []byte
binval, ok = kv.([]byte)
if !ok {
return result, fmt.Errorf("metadata %q value must be binary", hk)
}
// https://github.com/grpc/grpc-go/blob/v1.57.0/Documentation/grpc-metadata.md#storing-binary-data-in-metadata
strval = string(binval)
} else if strval, ok = kv.(string); !ok {
return result, fmt.Errorf("metadata %q value must be a string", hk)
}
result.Metadata[hk] = strval
md, err := newMetadata(params.Get(k))
if err != nil {
return result, fmt.Errorf("invalid metadata param: %w", err)
}

result.Metadata = md
case "tags":
if err := common.ApplyCustomUserTags(rt, &result.TagsAndMeta, params.Get(k)); err != nil {
return result, fmt.Errorf("metric tags: %w", err)
Expand All @@ -493,24 +474,72 @@ func (c *Client) parseInvokeParams(paramsVal goja.Value) (*invokeParams, error)
return result, nil
}

// newMetadata constructs a metadata.MD from the input value.
func newMetadata(input goja.Value) (metadata.MD, error) {
md := metadata.New(nil)

if common.IsNullish(input) {
return md, nil
}

v := input.Export()

raw, ok := v.(map[string]interface{})
if !ok {
return md, errors.New("must be an object with key-value pairs")
}

for hk, kv := range raw {
var val string
// The gRPC spec defines that Binary-valued keys end in -bin
// https://grpc.io/docs/what-is-grpc/core-concepts/#metadata
if strings.HasSuffix(hk, "-bin") {
var binVal []byte
if binVal, ok = kv.([]byte); !ok {
return md, fmt.Errorf("%q value must be binary", hk)
}

// https://github.com/grpc/grpc-go/blob/v1.57.0/Documentation/grpc-metadata.md#storing-binary-data-in-metadata
val = string(binVal)
} else if val, ok = kv.(string); !ok {
return md, fmt.Errorf("%q value must be a string", hk)
}

md.Append(hk, val)
}

return md, nil
}

type connectParams struct {
IsPlaintext bool
ReflectionMetadata metadata.MD
UseReflectionProtocol bool
Timeout time.Duration
MaxReceiveSize int64
MaxSendSize int64
TLS map[string]interface{}
}

func (c *Client) parseConnectParams(raw map[string]interface{}) (connectParams, error) {
func newConnectParams(rt *goja.Runtime, input goja.Value) (connectParams, error) { //nolint:funlen,gocognit,cyclop
params := connectParams{
IsPlaintext: false,
UseReflectionProtocol: false,
ReflectionMetadata: metadata.New(nil),
Timeout: time.Minute,
MaxReceiveSize: 0,
MaxSendSize: 0,
}
for k, v := range raw {

if common.IsNullish(input) {
return params, nil
}

raw := input.ToObject(rt)

for _, k := range raw.Keys() {
v := raw.Get(k).Export()

switch k {
case "plaintext":
var ok bool
Expand All @@ -530,6 +559,12 @@ func (c *Client) parseConnectParams(raw map[string]interface{}) (connectParams,
if !ok {
return params, fmt.Errorf("invalid reflect value: '%#v', it needs to be boolean", v)
}
case "reflectMetadata":
md, err := newMetadata(raw.Get(k))
if err != nil {
return params, fmt.Errorf("invalid reflectMetadata param: %w", err)
}
params.ReflectionMetadata = md
case "maxReceiveSize":
var ok bool
params.MaxReceiveSize, ok = v.(int64)
Expand Down
Loading