Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Encryption support ext consumer #612

Merged
merged 25 commits into from
Oct 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ type ConsumerOptions struct {

// MaxReconnectToBroker set the maximum retry number of reconnectToBroker. (default: ultimate)
MaxReconnectToBroker *uint

// Decryption decryption related fields to decrypt the encrypted message
Decryption *MessageDecryptionInfo
}

// Consumer is an interface that abstracts behavior of Pulsar's consumer
Expand Down
1 change: 1 addition & 0 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
maxReconnectToBroker: c.options.MaxReconnectToBroker,
keySharedPolicy: c.options.KeySharedPolicy,
schema: c.options.Schema,
decryption: c.options.Decryption,
}
cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq, c.metrics)
ch <- ConsumerError{
Expand Down
108 changes: 107 additions & 1 deletion pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import (

"github.com/gogo/protobuf/proto"

"github.com/apache/pulsar-client-go/pulsar/crypto"
"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/internal/compression"
cryptointernal "github.com/apache/pulsar-client-go/pulsar/internal/crypto"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"

Expand Down Expand Up @@ -97,6 +99,7 @@ type partitionConsumerOpts struct {
maxReconnectToBroker *uint
keySharedPolicy *KeySharedPolicy
schema Schema
decryption *MessageDecryptionInfo
}

type partitionConsumer struct {
Expand Down Expand Up @@ -141,6 +144,7 @@ type partitionConsumer struct {
providersMutex sync.RWMutex
compressionProviders map[pb.CompressionType]compression.Provider
metrics *internal.TopicMetrics
decryptor cryptointernal.Decryptor
}

func newPartitionConsumer(parent Consumer, client *client, options *partitionConsumerOpts,
Expand Down Expand Up @@ -175,6 +179,27 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
"subscription": options.subscription,
"consumerID": pc.consumerID,
})

var decryptor cryptointernal.Decryptor
if pc.options.decryption == nil {
decryptor = cryptointernal.NewNoopDecryptor() // default to noopDecryptor
} else {
if options.decryption.MessageCrypto == nil {
messageCrypto, err := crypto.NewDefaultMessageCrypto("decrypt", false, pc.log)
if err != nil {
return nil, err
}
options.decryption.MessageCrypto = messageCrypto
}
decryptor = cryptointernal.NewConsumerDecryptor(
options.decryption.KeyReader,
options.decryption.MessageCrypto,
pc.log,
)
}

pc.decryptor = decryptor

pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay, pc.log)

err := pc.grabConn()
Expand Down Expand Up @@ -479,7 +504,54 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
return err
}

uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, headersAndPayload)
decryptedPayload, err := pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta)
// error decrypting the payload
if err != nil {
// default crypto failure action
crypToFailureAction := crypto.ConsumerCryptoFailureActionFail
if pc.options.decryption != nil {
crypToFailureAction = pc.options.decryption.ConsumerCryptoFailureAction
}

switch crypToFailureAction {
case crypto.ConsumerCryptoFailureActionFail:
pc.log.Errorf("consuming message failed due to decryption err :%v", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The java clients add this to the unacked message tracker do we need to do the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me take a look at this..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we have this so may we just need to ack the message so it's not resent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, but i don't think it is good idea to do ack.

Let's say user wants to consume this message again by providing proper crypto configuration, if we do ack, then he may not be able to consume.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I think it's better to resend the message than doing ack.
Any other thoughts here ??

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can do nack. I'll push changes with nack on failure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cckellogg I've made the changes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me will you please fix the CI issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @cckellogg

I don't think CI is failing due to last commit.

I see most of the open PR's are failing with the same issue as below.

TestNamespaceTopicsNamespaceDoesNotExit (56.25s) client_impl_test.go:387: Error Trace: client_impl_test.go:387 Error: Expected nil, but got: &errors.errorString{s:"server error: AuthorizationError: Exception occurred while trying to authorize GetTopicsOfNamespace"} Test: TestNamespaceTopicsNamespaceDoesNotExit

And last commit do not have any changes that effects client_impl.go. More likely this has something to do with broker config that CI uses for running test cases.

please let me know If I'm wrong here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cckellogg looks like the CI is passed now.
Please do suggest if there are any other improvements on this PR.

pc.NackID(newTrackingMessageID(int64(pbMsgID.GetLedgerId()), int64(pbMsgID.GetEntryId()), 0, 0, nil))
return err
case crypto.ConsumerCryptoFailureActionDiscard:
pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecryptionError)
return fmt.Errorf("discarding message on decryption error :%v", err)
case crypto.ConsumerCryptoFailureActionConsume:
pc.log.Warnf("consuming encrypted message due to error in decryption :%v", err)
messages := []*message{
{
publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
eventTime: timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
key: msgMeta.GetPartitionKey(),
producerName: msgMeta.GetProducerName(),
properties: internal.ConvertToStringMap(msgMeta.GetProperties()),
topic: pc.topic,
msgID: newMessageID(
int64(pbMsgID.GetLedgerId()),
int64(pbMsgID.GetEntryId()),
pbMsgID.GetBatchIndex(),
pc.partitionIdx,
),
payLoad: headersAndPayload.ReadableSlice(),
schema: pc.options.schema,
replicationClusters: msgMeta.GetReplicateTo(),
replicatedFrom: msgMeta.GetReplicatedFrom(),
redeliveryCount: response.GetRedeliveryCount(),
encryptionContext: createEncryptionContext(msgMeta),
},
}
pc.queueCh <- messages
return nil
}
}

// decryption is success, decompress the payload
uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, internal.NewBufferWrapper(decryptedPayload))
if err != nil {
pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecompressionError)
return err
Expand All @@ -492,6 +564,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
if msgMeta.NumMessagesInBatch != nil {
numMsgs = int(msgMeta.GetNumMessagesInBatch())
}

messages := make([]*message, 0)
var ackTracker *ackTracker
// are there multiple messages in this batch?
Expand Down Expand Up @@ -589,6 +662,39 @@ func (pc *partitionConsumer) messageShouldBeDiscarded(msgID trackingMessageID) b
return pc.startMessageID.greaterEqual(msgID.messageID)
}

// create EncryptionContext from message metadata
// this will be used to decrypt the message payload outside of this client
// it is the responsibility of end user to decrypt the payload
// It will be used only when crypto failure action is set to consume i.e crypto.ConsumerCryptoFailureActionConsume
func createEncryptionContext(msgMeta *pb.MessageMetadata) *EncryptionContext {
encCtx := EncryptionContext{
Algorithm: msgMeta.GetEncryptionAlgo(),
Param: msgMeta.GetEncryptionParam(),
UncompressedSize: int(msgMeta.GetUncompressedSize()),
BatchSize: int(msgMeta.GetNumMessagesInBatch()),
}

if msgMeta.Compression != nil {
encCtx.CompressionType = CompressionType(*msgMeta.Compression)
}

kMap := map[string]EncryptionKey{}
for _, k := range msgMeta.GetEncryptionKeys() {
metaMap := map[string]string{}
for _, m := range k.GetMetadata() {
metaMap[*m.Key] = *m.Value
}

kMap[*k.Key] = EncryptionKey{
KeyValue: k.GetValue(),
Metadata: metaMap,
}
}

encCtx.Keys = kMap
return &encCtx
}

func (pc *partitionConsumer) ConnectionClosed() {
// Trigger reconnection in the consumer goroutine
pc.log.Debug("connection closed and send to connectClosedCh")
Expand Down
4 changes: 4 additions & 0 deletions pulsar/consumer_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"

"github.com/apache/pulsar-client-go/pulsar/internal/compression"
"github.com/apache/pulsar-client-go/pulsar/internal/crypto"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"

"github.com/stretchr/testify/assert"
Expand All @@ -36,6 +37,7 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
compressionProviders: make(map[pb.CompressionType]compression.Provider),
options: &partitionConsumerOpts{},
metrics: internal.NewMetricsProvider(map[string]string{}).GetTopicMetrics("topic"),
decryptor: crypto.NewNoopDecryptor(),
}

headersAndPayload := internal.NewBufferWrapper(rawCompatSingleMessage)
Expand Down Expand Up @@ -67,6 +69,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
compressionProviders: make(map[pb.CompressionType]compression.Provider),
options: &partitionConsumerOpts{},
metrics: internal.NewMetricsProvider(map[string]string{}).GetTopicMetrics("topic"),
decryptor: crypto.NewNoopDecryptor(),
}

headersAndPayload := internal.NewBufferWrapper(rawBatchMessage1)
Expand Down Expand Up @@ -98,6 +101,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
compressionProviders: make(map[pb.CompressionType]compression.Provider),
options: &partitionConsumerOpts{},
metrics: internal.NewMetricsProvider(map[string]string{}).GetTopicMetrics("topic"),
decryptor: crypto.NewNoopDecryptor(),
}

headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10)
Expand Down
Loading