Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes issue where a CE response is truncated #6758

Merged
merged 2 commits into from
Feb 27, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions pkg/channel/message_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,30 +239,28 @@ func (d *MessageDispatcherImpl) executeRequest(ctx context.Context,
}
execInfo.Time = dispatchTime

body := make([]byte, attributes.KnativeErrorDataExtensionMaxLength)
body := new(bytes.Buffer)
_, readErr := body.ReadFrom(response.Body)

if isFailure(response.StatusCode) {
// Read response body into execInfo for failures
readLen, err := response.Body.Read(body)
if err != nil && err != io.EOF {
d.logger.Error("failed to read response body into DispatchExecutionInfo", zap.Error(err))
if readErr != nil && readErr != io.EOF {
d.logger.Error("failed to read response body", zap.Error(err))
execInfo.ResponseBody = []byte(fmt.Sprintf("dispatch error: %s", err.Error()))
} else {
execInfo.ResponseBody = body[:readLen]
execInfo.ResponseBody = body.Bytes()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you still want to truncate the payload here in error cases?

(I suspect the right thing to do may actually be to truncate the overall message, preferring to cut payload over other attributes.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't check if the response is a cloud event in error cases. The payload gets truncated in a later step by a transformer and then put in as a cloud event extension on the original message. It's then sent to the dead letter. So I think the truncation still happens, it's just no longer part of the executeRequest

I'll double check the above and add a conformance test

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll double check the above and add a conformance test

@gab-satchi Did you add some conformance test for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@matzew, Just an e2e test right now. I can add one under here but if we're looking for one that could be reused by other implementations then that seems like a bigger effort.

I'm seeing all the delivery based ones just TODO-ed here

}
_ = response.Body.Close()
// Reject non-successful responses.
return ctx, nil, nil, &execInfo, fmt.Errorf("unexpected HTTP response, expected 2xx, got %d", response.StatusCode)
}

var responseMessageBody []byte
// Read response body into responseMessage for message accepted
readLen, err := response.Body.Read(body)
if err != nil && err != io.EOF {
d.logger.Error("failed to read response body into cloudevents' Message", zap.Error(err))
if readErr != nil && readErr != io.EOF {
d.logger.Error("failed to read response body", zap.Error(err))
responseMessageBody = []byte(fmt.Sprintf("Failed to read response body: %s", err.Error()))
} else {
responseMessageBody = body[:readLen]
responseMessageBody = body.Bytes()
}
responseMessage := http.NewMessage(response.Header, io.NopCloser(bytes.NewReader(responseMessageBody)))

Expand Down
63 changes: 63 additions & 0 deletions pkg/channel/message_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,69 @@ func TestDispatchMessage(t *testing.T) {
},
lastReceiver: "deadLetter",
},
"no restriction on message response size": {
sendToDestination: true,
sendToReply: true,
hasDeadLetterSink: false,
header: map[string][]string{
// do-not-forward should not get forwarded.
"do-not-forward": {"header"},
"x-request-id": {"id123"},
"knative-1": {"knative-1-value"},
"knative-2": {"knative-2-value"},
},
body: "destination",
eventExtensions: map[string]string{
"abc": `"ce-abc-value"`,
},
expectedDestRequest: &requestValidation{
Headers: map[string][]string{
"x-request-id": {"id123"},
"knative-1": {"knative-1-value"},
"knative-2": {"knative-2-value"},
"prefer": {"reply"},
"traceparent": {"ignored-value-header"},
"ce-abc": {`"ce-abc-value"`},
"ce-id": {"ignored-value-header"},
"ce-time": {"2002-10-02T15:00:00Z"},
"ce-source": {testCeSource},
"ce-type": {testCeType},
"ce-specversion": {cloudevents.VersionV1},
},
Body: `"destination"`,
},
expectedReplyRequest: &requestValidation{
Headers: map[string][]string{
"x-request-id": {"id123"},
"knative-1": {"knative-1-value"},
"traceparent": {"ignored-value-header"},
"ce-abc": {`"ce-abc-value"`},
"ce-id": {"ignored-value-header"},
"ce-time": {"2002-10-02T15:00:00Z"},
"ce-source": {testCeSource},
"ce-type": {testCeType},
"ce-specversion": {cloudevents.VersionV1},
},
Body: strings.Repeat("a", 2000),
},
fakeResponse: &http.Response{
StatusCode: http.StatusAccepted,
Header: map[string][]string{
"do-not-passthrough": {"no"},
"x-request-id": {"id123"},
"knative-1": {"knative-1-value"},
"ce-abc": {`"ce-abc-value"`},
"ce-id": {"ignored-value-header"},
"ce-time": {"2002-10-02T15:00:00Z"},
"ce-source": {testCeSource},
"ce-type": {testCeType},
"ce-specversion": {cloudevents.VersionV1},
},
Body: io.NopCloser(bytes.NewBufferString(strings.Repeat("a", 2000))),
},

lastReceiver: "reply",
},
}
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
Expand Down