-
Notifications
You must be signed in to change notification settings - Fork 346
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Encryption support producer #560
Merged
cckellogg
merged 18 commits into
apache:master
from
Fanatics:encryption-support-ext-producer
Sep 3, 2021
Merged
Changes from 14 commits
Commits
Show all changes
18 commits
Select commit
Hold shift + click to select a range
d8bb8fe
add ability to encrypt messages
aa40a8d
fix typo
3b8d912
lint fixes
e1a8108
address review suggestions
d200b7c
revert go mod
feaf120
remove encryption context
a2bd72b
try to fix check issues
f204347
Merge branch 'master' into encryption-support-ext-producer
GPrabhudas a561923
remove unused code
559dbaf
Merge branch 'encryption-support-ext-producer' of https://github.com/…
d7246bc
remove embedded crypto struct
62937c5
review suggestions
39d43b2
remove duplicate log
d88b04e
lint code style issue fix
41b10bc
return error from flush methods on serialization error
c362133
update test case and do lazy data key generation
06c7612
address review changes
4dc8ddb
add comments on test case
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package pulsar | ||
|
||
import "github.com/apache/pulsar-client-go/pulsar/crypto" | ||
|
||
// ProducerEncryptionInfo encryption related fields required by the producer | ||
type ProducerEncryptionInfo struct { | ||
// KeyReader read RSA public/private key pairs | ||
KeyReader crypto.KeyReader | ||
|
||
// MessageCrypto used to encrypt and decrypt the data and session keys | ||
MessageCrypto crypto.MessageCrypto | ||
|
||
// Keys list of encryption key names to encrypt session key | ||
Keys []string | ||
|
||
// ProducerCryptoFailureAction action to be taken on failure of message encryption | ||
// default is ProducerCryptoFailureActionFail | ||
ProducerCryptoFailureAction int | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,7 @@ import ( | |
"github.com/gogo/protobuf/proto" | ||
|
||
"github.com/apache/pulsar-client-go/pulsar/internal/compression" | ||
"github.com/apache/pulsar-client-go/pulsar/internal/crypto" | ||
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" | ||
) | ||
|
||
|
@@ -221,9 +222,21 @@ func serializeBatch(wb Buffer, | |
cmdSend *pb.BaseCommand, | ||
msgMetadata *pb.MessageMetadata, | ||
uncompressedPayload Buffer, | ||
compressionProvider compression.Provider) { | ||
compressionProvider compression.Provider, | ||
encryptor crypto.Encryptor) { | ||
// Wire format | ||
// [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD] | ||
|
||
// compress the payload | ||
compressedPayload := compressionProvider.Compress(nil, uncompressedPayload.ReadableSlice()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We want to compress before encrypting? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As per java implementation => Yes compress and then encrypt. |
||
|
||
// encrypt the compressed payload | ||
encryptedPayload, err := encryptor.Encrypt(compressedPayload, msgMetadata) | ||
if err != nil { | ||
// error occurred while encrypting the payload, ProducerCryptoFailureAction is set to Fail | ||
panic(fmt.Sprintf("Encryption of message failed, ProducerCryptoFailureAction is set to Fail. Error :%v", err)) | ||
GPrabhudas marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
cmdSize := uint32(proto.Size(cmdSend)) | ||
msgMetadataSize := uint32(proto.Size(msgMetadata)) | ||
|
||
|
@@ -234,7 +247,7 @@ func serializeBatch(wb Buffer, | |
// Write cmd | ||
wb.WriteUint32(cmdSize) | ||
wb.ResizeIfNeeded(cmdSize) | ||
_, err := cmdSend.MarshalToSizedBuffer(wb.WritableSlice()[:cmdSize]) | ||
_, err = cmdSend.MarshalToSizedBuffer(wb.WritableSlice()[:cmdSize]) | ||
if err != nil { | ||
panic(fmt.Sprintf("Protobuf error when serializing cmdSend: %v", err)) | ||
} | ||
|
@@ -255,12 +268,8 @@ func serializeBatch(wb Buffer, | |
} | ||
wb.WrittenBytes(msgMetadataSize) | ||
|
||
// Make sure the buffer has enough space to hold the compressed data | ||
// and perform the compression in-place | ||
maxSize := uint32(compressionProvider.CompressMaxSize(int(uncompressedPayload.ReadableBytes()))) | ||
wb.ResizeIfNeeded(maxSize) | ||
b := compressionProvider.Compress(wb.WritableSlice()[:0], uncompressedPayload.ReadableSlice()) | ||
wb.WrittenBytes(uint32(len(b))) | ||
// add payload to the buffer | ||
wb.Write(encryptedPayload) | ||
|
||
// Write checksum at created checksum-placeholder | ||
frameEndIdx := wb.WriterIndex() | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package crypto | ||
|
||
import ( | ||
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" | ||
) | ||
|
||
// Encryptor support encryption | ||
type Encryptor interface { | ||
Encrypt([]byte, *pb.MessageMetadata) ([]byte, error) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package crypto | ||
|
||
import ( | ||
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" | ||
) | ||
|
||
type noopEncryptor struct{} | ||
|
||
func NewNoopEncryptor() Encryptor { | ||
return &noopEncryptor{} | ||
} | ||
|
||
// Encrypt Noop ecryptor | ||
func (e *noopEncryptor) Encrypt(data []byte, msgMetadata *pb.MessageMetadata) ([]byte, error) { | ||
return data, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package crypto | ||
|
||
import ( | ||
"fmt" | ||
|
||
"github.com/apache/pulsar-client-go/pulsar/crypto" | ||
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" | ||
"github.com/apache/pulsar-client-go/pulsar/log" | ||
) | ||
|
||
type producerEncryptor struct { | ||
keys []string | ||
keyReader crypto.KeyReader | ||
messageCrypto crypto.MessageCrypto | ||
logger log.Logger | ||
producerCryptoFailureAction int | ||
} | ||
|
||
func NewProducerEncryptor(keys []string, | ||
keyReader crypto.KeyReader, | ||
messageCrypto crypto.MessageCrypto, | ||
producerCryptoFailureAction int, | ||
logger log.Logger) Encryptor { | ||
return &producerEncryptor{ | ||
keys: keys, | ||
keyReader: keyReader, | ||
messageCrypto: messageCrypto, | ||
logger: logger, | ||
producerCryptoFailureAction: producerCryptoFailureAction, | ||
} | ||
} | ||
|
||
// Encrypt producer encryptor | ||
func (e *producerEncryptor) Encrypt(payload []byte, msgMetadata *pb.MessageMetadata) ([]byte, error) { | ||
// encrypt payload | ||
encryptedPayload, err := e.messageCrypto.Encrypt(e.keys, | ||
e.keyReader, | ||
crypto.NewMessageMetadataSupplier(msgMetadata), | ||
payload) | ||
|
||
// error encryping the payload | ||
if err != nil { | ||
// error occurred in encrypting the payload | ||
// crypto ProducerCryptoFailureAction is set to send | ||
// send unencrypted message | ||
if e.producerCryptoFailureAction == crypto.ProducerCryptoFailureActionSend { | ||
e.logger. | ||
WithError(err). | ||
Warnf("Encryption failed for payload sending unencrypted message ProducerCryptoFailureAction is set to send") | ||
return payload, nil | ||
} | ||
|
||
return nil, fmt.Errorf("ProducerCryptoFailureAction is set to Fail and error occurred in encrypting payload :%v", err) | ||
} | ||
return encryptedPayload, nil | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious why is the mod and sum file changing? Can these changes be done in a separate PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me recheck again :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
synced with master branch