-
Notifications
You must be signed in to change notification settings - Fork 345
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Encryption support ext consumer #612
Encryption support ext consumer #612
Conversation
- use base crypto package for encryption
- move it to Consumer MR
…Fanatics/pulsar-client-go into encryption-support-ext-producer
pulsar/impl_message.go
Outdated
@@ -215,6 +233,7 @@ type message struct { | |||
replicatedFrom string | |||
redeliveryCount uint32 | |||
schema Schema | |||
encryptionContext EncryptionContext |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make this a pointer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pulsar/internal/crypto/decryptor.go
Outdated
// Decryptor support decrypting of message | ||
type Decryptor interface { | ||
Decrypt(payload []byte, msgID *pb.MessageIdData, msgMetadata *pb.MessageMetadata) ([]byte, error) | ||
CryptoFailureAction() int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this needed? I think the Decryptor should just have Decrypt and return an error. The caller of this can then figure out what to do with the error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pulsar/message.go
Outdated
|
||
// GetEncryptionContext get the ecryption context of message | ||
// It will be used by the application to parse undecrypted message | ||
GetEncryptionContext() EncryptionContext |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's return a pointer or interface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pc.log.Error(err) | ||
switch pc.decryptor.CryptoFailureAction() { | ||
case crypto.ConsumerCryptoFailureActionFail: | ||
pc.log.Errorf("consuming message failed due to decryption err :%v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The java clients add this to the unacked message tracker do we need to do the same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me take a look at this..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we have this so may we just need to ack the message so it's not resent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, but i don't think it is good idea to do ack
.
Let's say user wants to consume this message again by providing proper crypto configuration, if we do ack
, then he may not be able to consume.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, I think it's better to resend the message than doing ack
.
Any other thoughts here ??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can do nack. I'll push changes with nack on failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cckellogg I've made the changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me will you please fix the CI issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @cckellogg
I don't think CI is failing due to last commit.
I see most of the open PR's are failing with the same issue as below.
TestNamespaceTopicsNamespaceDoesNotExit (56.25s) client_impl_test.go:387: Error Trace: client_impl_test.go:387 Error: Expected nil, but got: &errors.errorString{s:"server error: AuthorizationError: Exception occurred while trying to authorize GetTopicsOfNamespace"} Test: TestNamespaceTopicsNamespaceDoesNotExit
And last commit do not have any changes that effects client_impl.go
. More likely this has something to do with broker config that CI uses for running test cases.
please let me know If I'm wrong here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cckellogg looks like the CI is passed now.
Please do suggest if there are any other improvements on this PR.
pulsar/consumer_partition.go
Outdated
|
||
// error decrypting the payload | ||
if err != nil { | ||
pc.log.Error(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this and add a log under ConsumerCryptoFailureActionDiscard so there is more context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pulsar/consumer_partition.go
Outdated
replicationClusters: msgMeta.GetReplicateTo(), | ||
replicatedFrom: msgMeta.GetReplicatedFrom(), | ||
redeliveryCount: response.GetRedeliveryCount(), | ||
encryptionContext: createEncryptionContext(msgMeta), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the encryptionContext need to be added to messages that don't fail?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there is a need to add this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the encryption context is only needed for failed messages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
pulsar/consumer_partition.go
Outdated
case crypto.ConsumerCryptoFailureActionConsume: | ||
pc.log.Warnf("consuming encrypted message due to error in decryption :%v", err) | ||
messages = append(messages, &message{ | ||
publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit. we can use create a new variable here and move messages := make([]*message, 0)
back to where it was
messages := []*message{
{
},
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Breakdown of PR #552
This PR includes the encryption/decryption changes at consumer side.