diff --git a/go.mod b/go.mod index 67a7bb725b..354f5b4d06 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,6 @@ require ( github.com/google/uuid v1.1.2 github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/klauspost/compress v1.10.8 - github.com/kr/pretty v0.2.0 // indirect github.com/linkedin/goavro/v2 v2.9.8 github.com/opentracing/opentracing-go v1.2.0 github.com/pierrec/lz4 v2.0.5+incompatible @@ -27,7 +26,6 @@ require ( github.com/stretchr/testify v1.5.1 go.uber.org/atomic v1.7.0 golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d - gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect ) replace github.com/apache/pulsar-client-go/oauth2 => ./oauth2 diff --git a/go.sum b/go.sum index 2ccd39eb36..8b372c7c5d 100644 --- a/go.sum +++ b/go.sum @@ -22,7 +22,6 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b h1:AP/Y7sqYicnjGDfD5VcY4CIfh1hRXBUavxrvELjTiOE= github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q= -github.com/boynton/repl v0.0.0-20170116235056-348863958e3e/go.mod h1:Crc/GCZ3NXDVCio7Yr0o+SSrytpcFhLmVCIzi0s49t4= github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/danieljoos/wincred v1.0.2 h1:zf4bhty2iLuwgjgpraD2E9UbvO+fe54XXGJbOwe23fU= @@ -32,8 +31,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= -github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1 h1:CaO/zOnF8VvUfEbhRatPcwKVWamvbYd8tQGRWacE9kU= -github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1/go.mod h1:+hnT3ywWDTAFrW5aE+u2Sa/wT555ZqwoCS+pk3p6ry4= github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA= github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0= github.com/dvsekhvalnov/jose2go v0.0.0-20200901110807-248326c1351b h1:HBah4D48ypg3J7Np4N+HY/ZR76fx3HEUGxDU6Uk39oQ= @@ -73,8 +70,6 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= -github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU= github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0= @@ -82,7 +77,6 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/jawher/mow.cli v1.0.4/go.mod h1:5hQj2V8g+qYmLUVWqu4Wuja1pI57M83EChYLVZ0sMKk= -github.com/jawher/mow.cli v1.1.0/go.mod h1:aNaQlc7ozF3vw6IJ2dHjp2ZFiA4ozMIYY6PyuRJwlUg= github.com/jawher/mow.cli v1.2.0/go.mod h1:y+pcA3jBAdo/GIZx/0rFjw/K2bVEODP9rfZOfaiq8Ko= github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= @@ -169,8 +163,6 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= -github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= @@ -190,7 +182,6 @@ golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= @@ -212,7 +203,6 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -227,7 +217,6 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20190808195139-e713427fea3f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= @@ -259,9 +248,5 @@ gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= -gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pulsar/encryption.go b/pulsar/encryption.go new file mode 100644 index 0000000000..aade2ca276 --- /dev/null +++ b/pulsar/encryption.go @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import "github.com/apache/pulsar-client-go/pulsar/crypto" + +// ProducerEncryptionInfo encryption related fields required by the producer +type ProducerEncryptionInfo struct { + // KeyReader read RSA public/private key pairs + KeyReader crypto.KeyReader + + // MessageCrypto used to encrypt and decrypt the data and session keys + MessageCrypto crypto.MessageCrypto + + // Keys list of encryption key names to encrypt session key + Keys []string + + // ProducerCryptoFailureAction action to be taken on failure of message encryption + // default is ProducerCryptoFailureActionFail + ProducerCryptoFailureAction int +} diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go index 3e1601f156..92d624922d 100644 --- a/pulsar/internal/batch_builder.go +++ b/pulsar/internal/batch_builder.go @@ -23,6 +23,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/apache/pulsar-client-go/pulsar/internal/compression" + "github.com/apache/pulsar-client-go/pulsar/internal/crypto" pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" "github.com/apache/pulsar-client-go/pulsar/log" ) @@ -35,7 +36,7 @@ type BuffersPool interface { type BatcherBuilderProvider func( maxMessages uint, maxBatchSize uint, producerName string, producerID uint64, compressionType pb.CompressionType, level compression.Level, - bufferPool BuffersPool, logger log.Logger, + bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor, ) (BatchBuilder, error) // BatchBuilder is a interface of batch builders @@ -51,12 +52,12 @@ type BatchBuilder interface { ) bool // Flush all the messages buffered in the client and wait until all messages have been successfully persisted. - Flush() (batchData Buffer, sequenceID uint64, callbacks []interface{}) + Flush() (batchData Buffer, sequenceID uint64, callbacks []interface{}, err error) // Flush all the messages buffered in multiple batches and wait until all // messages have been successfully persisted. FlushBatches() ( - batchData []Buffer, sequenceID []uint64, callbacks [][]interface{}, + batchData []Buffer, sequenceID []uint64, callbacks [][]interface{}, errors []error, ) // Return the batch container batch message in multiple batches. @@ -93,13 +94,15 @@ type batchContainer struct { buffersPool BuffersPool log log.Logger + + encryptor crypto.Encryptor } // newBatchContainer init a batchContainer func newBatchContainer( maxMessages uint, maxBatchSize uint, producerName string, producerID uint64, compressionType pb.CompressionType, level compression.Level, - bufferPool BuffersPool, logger log.Logger, + bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor, ) batchContainer { bc := batchContainer{ @@ -122,6 +125,7 @@ func newBatchContainer( compressionProvider: getCompressionProvider(compressionType, level), buffersPool: bufferPool, log: logger, + encryptor: encryptor, } if compressionType != pb.CompressionType_NONE { @@ -135,12 +139,12 @@ func newBatchContainer( func NewBatchBuilder( maxMessages uint, maxBatchSize uint, producerName string, producerID uint64, compressionType pb.CompressionType, level compression.Level, - bufferPool BuffersPool, logger log.Logger, + bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor, ) (BatchBuilder, error) { bc := newBatchContainer( maxMessages, maxBatchSize, producerName, producerID, compressionType, - level, bufferPool, logger, + level, bufferPool, logger, encryptor, ) return &bc, nil @@ -211,11 +215,11 @@ func (bc *batchContainer) reset() { // Flush all the messages buffered in the client and wait until all messages have been successfully persisted. func (bc *batchContainer) Flush() ( - batchData Buffer, sequenceID uint64, callbacks []interface{}, + batchData Buffer, sequenceID uint64, callbacks []interface{}, err error, ) { if bc.numMessages == 0 { // No-Op for empty batch - return nil, 0, nil + return nil, 0, nil, nil } bc.log.Debug("BatchBuilder flush: messages: ", bc.numMessages) @@ -229,19 +233,21 @@ func (bc *batchContainer) Flush() ( if buffer == nil { buffer = NewBuffer(int(uncompressedSize * 3 / 2)) } - serializeBatch( - buffer, bc.cmdSend, bc.msgMetadata, bc.buffer, bc.compressionProvider, - ) + + if err = serializeBatch( + buffer, bc.cmdSend, bc.msgMetadata, bc.buffer, bc.compressionProvider, bc.encryptor, + ); err == nil { // no error in serializing Batch + sequenceID = bc.cmdSend.Send.GetSequenceId() + } callbacks = bc.callbacks - sequenceID = bc.cmdSend.Send.GetSequenceId() bc.reset() - return buffer, sequenceID, callbacks + return buffer, sequenceID, callbacks, err } // FlushBatches only for multiple batches container func (bc *batchContainer) FlushBatches() ( - batchData []Buffer, sequenceID []uint64, callbacks [][]interface{}, + batchData []Buffer, sequenceID []uint64, callbacks [][]interface{}, errors []error, ) { panic("single batch container not support FlushBatches(), please use Flush() instead") } diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go index af6bac5dd1..b91c0b6341 100644 --- a/pulsar/internal/commands.go +++ b/pulsar/internal/commands.go @@ -24,6 +24,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/apache/pulsar-client-go/pulsar/internal/compression" + "github.com/apache/pulsar-client-go/pulsar/internal/crypto" pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" ) @@ -221,9 +222,21 @@ func serializeBatch(wb Buffer, cmdSend *pb.BaseCommand, msgMetadata *pb.MessageMetadata, uncompressedPayload Buffer, - compressionProvider compression.Provider) { + compressionProvider compression.Provider, + encryptor crypto.Encryptor) error { // Wire format // [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD] + + // compress the payload + compressedPayload := compressionProvider.Compress(nil, uncompressedPayload.ReadableSlice()) + + // encrypt the compressed payload + encryptedPayload, err := encryptor.Encrypt(compressedPayload, msgMetadata) + if err != nil { + // error occurred while encrypting the payload, ProducerCryptoFailureAction is set to Fail + return fmt.Errorf("encryption of message failed, ProducerCryptoFailureAction is set to Fail. Error :%v", err) + } + cmdSize := uint32(proto.Size(cmdSend)) msgMetadataSize := uint32(proto.Size(msgMetadata)) @@ -234,7 +247,7 @@ func serializeBatch(wb Buffer, // Write cmd wb.WriteUint32(cmdSize) wb.ResizeIfNeeded(cmdSize) - _, err := cmdSend.MarshalToSizedBuffer(wb.WritableSlice()[:cmdSize]) + _, err = cmdSend.MarshalToSizedBuffer(wb.WritableSlice()[:cmdSize]) if err != nil { panic(fmt.Sprintf("Protobuf error when serializing cmdSend: %v", err)) } @@ -255,12 +268,8 @@ func serializeBatch(wb Buffer, } wb.WrittenBytes(msgMetadataSize) - // Make sure the buffer has enough space to hold the compressed data - // and perform the compression in-place - maxSize := uint32(compressionProvider.CompressMaxSize(int(uncompressedPayload.ReadableBytes()))) - wb.ResizeIfNeeded(maxSize) - b := compressionProvider.Compress(wb.WritableSlice()[:0], uncompressedPayload.ReadableSlice()) - wb.WrittenBytes(uint32(len(b))) + // add payload to the buffer + wb.Write(encryptedPayload) // Write checksum at created checksum-placeholder frameEndIdx := wb.WriterIndex() @@ -269,6 +278,7 @@ func serializeBatch(wb Buffer, // Set Sizes and checksum in the fixed-size header wb.PutUint32(frameEndIdx-frameStartIdx, frameSizeIdx) // External frame wb.PutUint32(checksum, checksumIdx) + return nil } // ConvertFromStringMap convert a string map to a KeyValue []byte diff --git a/pulsar/internal/crypto/encryptor.go b/pulsar/internal/crypto/encryptor.go new file mode 100644 index 0000000000..7fdbf06a09 --- /dev/null +++ b/pulsar/internal/crypto/encryptor.go @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package crypto + +import ( + pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" +) + +// Encryptor support encryption +type Encryptor interface { + Encrypt([]byte, *pb.MessageMetadata) ([]byte, error) +} diff --git a/pulsar/internal/crypto/noop_encryptor.go b/pulsar/internal/crypto/noop_encryptor.go new file mode 100644 index 0000000000..4512e7bd69 --- /dev/null +++ b/pulsar/internal/crypto/noop_encryptor.go @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package crypto + +import ( + pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" +) + +type noopEncryptor struct{} + +func NewNoopEncryptor() Encryptor { + return &noopEncryptor{} +} + +// Encrypt Noop ecryptor +func (e *noopEncryptor) Encrypt(data []byte, msgMetadata *pb.MessageMetadata) ([]byte, error) { + return data, nil +} diff --git a/pulsar/internal/crypto/producer_encryptor.go b/pulsar/internal/crypto/producer_encryptor.go new file mode 100644 index 0000000000..a5b972da10 --- /dev/null +++ b/pulsar/internal/crypto/producer_encryptor.go @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package crypto + +import ( + "fmt" + + "github.com/apache/pulsar-client-go/pulsar/crypto" + pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" + "github.com/apache/pulsar-client-go/pulsar/log" +) + +type producerEncryptor struct { + keys []string + keyReader crypto.KeyReader + messageCrypto crypto.MessageCrypto + logger log.Logger + producerCryptoFailureAction int +} + +func NewProducerEncryptor(keys []string, + keyReader crypto.KeyReader, + messageCrypto crypto.MessageCrypto, + producerCryptoFailureAction int, + logger log.Logger) Encryptor { + return &producerEncryptor{ + keys: keys, + keyReader: keyReader, + messageCrypto: messageCrypto, + logger: logger, + producerCryptoFailureAction: producerCryptoFailureAction, + } +} + +// Encrypt producer encryptor +func (e *producerEncryptor) Encrypt(payload []byte, msgMetadata *pb.MessageMetadata) ([]byte, error) { + // encrypt payload + encryptedPayload, err := e.messageCrypto.Encrypt(e.keys, + e.keyReader, + crypto.NewMessageMetadataSupplier(msgMetadata), + payload) + + // error encryping the payload + if err != nil { + // error occurred in encrypting the payload + // crypto ProducerCryptoFailureAction is set to send + // send unencrypted message + if e.producerCryptoFailureAction == crypto.ProducerCryptoFailureActionSend { + e.logger. + WithError(err). + Warnf("Encryption failed for payload sending unencrypted message ProducerCryptoFailureAction is set to send") + return payload, nil + } + + return nil, fmt.Errorf("ProducerCryptoFailureAction is set to Fail and error occurred in encrypting payload :%v", err) + } + return encryptedPayload, nil +} diff --git a/pulsar/internal/key_based_batch_builder.go b/pulsar/internal/key_based_batch_builder.go index 545c2c8996..940aa9f8d6 100644 --- a/pulsar/internal/key_based_batch_builder.go +++ b/pulsar/internal/key_based_batch_builder.go @@ -24,6 +24,7 @@ import ( "time" "github.com/apache/pulsar-client-go/pulsar/internal/compression" + "github.com/apache/pulsar-client-go/pulsar/internal/crypto" pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" "github.com/apache/pulsar-client-go/pulsar/log" ) @@ -85,14 +86,14 @@ func (h *keyBasedBatches) Val(key string) *batchContainer { func NewKeyBasedBatchBuilder( maxMessages uint, maxBatchSize uint, producerName string, producerID uint64, compressionType pb.CompressionType, level compression.Level, - bufferPool BuffersPool, logger log.Logger, + bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor, ) (BatchBuilder, error) { bb := &keyBasedBatchContainer{ batches: newKeyBasedBatches(), batchContainer: newBatchContainer( maxMessages, maxBatchSize, producerName, producerID, - compressionType, level, bufferPool, logger, + compressionType, level, bufferPool, logger, encryptor, ), compressionType: compressionType, level: level, @@ -144,7 +145,7 @@ func (bc *keyBasedBatchContainer) Add( // create batchContainer for new key t := newBatchContainer( bc.maxMessages, bc.maxBatchSize, bc.producerName, bc.producerID, - bc.compressionType, bc.level, bc.buffersPool, bc.log, + bc.compressionType, bc.level, bc.buffersPool, bc.log, bc.encryptor, ) batchPart = &t bc.batches.Add(msgKey, &t) @@ -179,11 +180,11 @@ func (bc *keyBasedBatchContainer) reset() { // Flush all the messages buffered in multiple batches and wait until all // messages have been successfully persisted. func (bc *keyBasedBatchContainer) FlushBatches() ( - batchesData []Buffer, sequenceIDs []uint64, callbacks [][]interface{}, + batchesData []Buffer, sequenceIDs []uint64, callbacks [][]interface{}, errors []error, ) { if bc.numMessages == 0 { // No-Op for empty batch - return nil, nil, nil + return nil, nil, nil, nil } bc.log.Debug("keyBasedBatchContainer flush: messages: ", bc.numMessages) @@ -194,6 +195,7 @@ func (bc *keyBasedBatchContainer) FlushBatches() ( batchesData = make([]Buffer, batchesLen) sequenceIDs = make([]uint64, batchesLen) callbacks = make([][]interface{}, batchesLen) + errors = make([]error, batchesLen) bc.batches.l.RLock() defer bc.batches.l.RUnlock() @@ -203,21 +205,22 @@ func (bc *keyBasedBatchContainer) FlushBatches() ( sort.Strings(sortedKeys) for _, k := range sortedKeys { container := bc.batches.containers[k] - b, s, c := container.Flush() + b, s, c, err := container.Flush() if b != nil { batchesData[idx] = b sequenceIDs[idx] = s callbacks[idx] = c + errors[idx] = err } idx++ } bc.reset() - return batchesData, sequenceIDs, callbacks + return batchesData, sequenceIDs, callbacks, errors } func (bc *keyBasedBatchContainer) Flush() ( - batchData Buffer, sequenceID uint64, callbacks []interface{}, + batchData Buffer, sequenceID uint64, callbacks []interface{}, err error, ) { panic("multi batches container not support Flush(), please use FlushBatches() instead") } diff --git a/pulsar/producer.go b/pulsar/producer.go index ffbdebbb46..07a8f751e0 100644 --- a/pulsar/producer.go +++ b/pulsar/producer.go @@ -163,6 +163,9 @@ type ProducerOptions struct { // PartitionsAutoDiscoveryInterval is the time interval for the background process to discover new partitions // Default is 1 minute PartitionsAutoDiscoveryInterval time.Duration + + // Encryption necessary fields to perform encryption of message + Encryption *ProducerEncryptionInfo } // Producer is used to publish messages on a topic diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 8b3d33dadd..5f5e0f040d 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -19,11 +19,14 @@ package pulsar import ( "context" + "fmt" "sync" "sync/atomic" "time" + "github.com/apache/pulsar-client-go/pulsar/crypto" "github.com/apache/pulsar-client-go/pulsar/internal/compression" + internalcrypto "github.com/apache/pulsar-client-go/pulsar/internal/crypto" "github.com/gogo/protobuf/proto" @@ -126,6 +129,24 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions p.producerName = options.Name } + encryption := options.Encryption + // add default message crypto if not provided + if encryption != nil && len(encryption.Keys) > 0 { + if encryption.KeyReader == nil { + return nil, fmt.Errorf("encryption is enabled, KeyReader can not be nil") + } + + if encryption.MessageCrypto == nil { + logCtx := fmt.Sprintf("[%v] [%v] [%v]", p.topic, p.producerName, p.producerID) + messageCrypto, err := crypto.NewDefaultMessageCrypto(logCtx, true, logger) + if err != nil { + logger.WithError(err).Error("Unable to get MessageCrypto instance. Producer creation is abandoned") + return nil, err + } + p.options.Encryption.MessageCrypto = messageCrypto + } + } + err := p.grabCnx() if err != nil { logger.WithError(err).Error("Failed to create producer") @@ -143,6 +164,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions if p.options.SendTimeout > 0 { go p.failTimeoutMessages() } + go p.runEventsLoop() return p, nil @@ -197,13 +219,25 @@ func (p *partitionProducer) grabCnx() error { } p.producerName = res.Response.ProducerSuccess.GetProducerName() + + var encryptor internalcrypto.Encryptor + if p.options.Encryption != nil { + encryptor = internalcrypto.NewProducerEncryptor(p.options.Encryption.Keys, + p.options.Encryption.KeyReader, + p.options.Encryption.MessageCrypto, + p.options.Encryption.ProducerCryptoFailureAction, p.log) + } else { + encryptor = internalcrypto.NewNoopEncryptor() + } + if p.options.DisableBatching { provider, _ := GetBatcherBuilderProvider(DefaultBatchBuilder) p.batchBuilder, err = provider(p.options.BatchingMaxMessages, p.options.BatchingMaxSize, p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType), compression.Level(p.options.CompressionLevel), p, - p.log) + p.log, + encryptor) if err != nil { return err } @@ -217,7 +251,8 @@ func (p *partitionProducer) grabCnx() error { p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType), compression.Level(p.options.CompressionLevel), p, - p.log) + p.log, + encryptor) if err != nil { return err } @@ -458,11 +493,22 @@ type pendingItem struct { } func (p *partitionProducer) internalFlushCurrentBatch() { - batchData, sequenceID, callbacks := p.batchBuilder.Flush() + batchData, sequenceID, callbacks, err := p.batchBuilder.Flush() if batchData == nil { return } + // error occurred in batch flush + // report it using callback + if err != nil { + for _, cb := range callbacks { + if sr, ok := cb.(*sendRequest); ok { + sr.callback(nil, sr.msg, err) + } + } + return + } + p.pendingQueue.Put(&pendingItem{ sentAt: time.Now(), batchData: batchData, @@ -577,12 +623,22 @@ func (p *partitionProducer) failTimeoutMessages() { } func (p *partitionProducer) internalFlushCurrentBatches() { - batchesData, sequenceIDs, callbacks := p.batchBuilder.FlushBatches() + batchesData, sequenceIDs, callbacks, errors := p.batchBuilder.FlushBatches() if batchesData == nil { return } for i := range batchesData { + // error occurred in processing batch + // report it using callback + if errors[i] != nil { + for _, cb := range callbacks[i] { + if sr, ok := cb.(*sendRequest); ok { + sr.callback(nil, sr.msg, errors[i]) + } + } + continue + } if batchesData[i] == nil { continue } diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 7c3dbd76bb..6211f0181e 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -30,6 +30,8 @@ import ( "github.com/apache/pulsar-client-go/pulsar/internal" "github.com/stretchr/testify/assert" + "github.com/apache/pulsar-client-go/pulsar/crypto" + plog "github.com/apache/pulsar-client-go/pulsar/log" log "github.com/sirupsen/logrus" ) @@ -996,6 +998,113 @@ func TestSendContextExpired(t *testing.T) { makeHTTPCall(t, http.MethodDelete, quotaURL, "") } +func TestProducerWithRSAEncryption(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic := newTopicName() + ctx := context.Background() + + msgCrypto, err := crypto.NewDefaultMessageCrypto("testing", true, plog.DefaultNopLogger()) + assert.Nil(t, err) + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: false, + Encryption: &ProducerEncryptionInfo{ + KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem", + "crypto/testdata/pri_key_rsa.pem"), + MessageCrypto: msgCrypto, + Keys: []string{"my-app.key"}, + }, + Schema: NewStringSchema(nil), + }) + + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + for i := 0; i < 10; i++ { + if _, err := producer.Send(ctx, &ProducerMessage{ + Value: fmt.Sprintf("hello-%d", i), + }); err != nil { + log.Fatal(err) + } + } +} + +func TestProducuerCreationFailOnNilKeyReader(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic := newTopicName() + + msgCrypto, err := crypto.NewDefaultMessageCrypto("testing", true, plog.DefaultNopLogger()) + assert.Nil(t, err) + + // create producer + // Producer creation should fail as keyreader is nil + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: false, + Encryption: &ProducerEncryptionInfo{ + MessageCrypto: msgCrypto, + Keys: []string{"my-app.key"}, + }, + Schema: NewStringSchema(nil), + }) + + assert.NotNil(t, err) + assert.Nil(t, producer) +} + +func TestProducuerSendFailOnInvalidKey(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic := newTopicName() + + msgCrypto, err := crypto.NewDefaultMessageCrypto("testing", true, plog.DefaultNopLogger()) + assert.Nil(t, err) + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: false, + Encryption: &ProducerEncryptionInfo{ + KeyReader: crypto.NewFileKeyReader("crypto/testdata/invalid_pub_key_rsa.pem", + "crypto/testdata/pri_key_rsa.pem"), + MessageCrypto: msgCrypto, + Keys: []string{"my-app.key"}, + }, + Schema: NewStringSchema(nil), + }) + + assert.Nil(t, err) + assert.NotNil(t, producer) + + // producer should send return an error as keyreader is configured with wrong pub.key and fail while encrypting message + mid, err := producer.Send(context.Background(), &ProducerMessage{ + Value: "test", + }) + + assert.NotNil(t, err) + assert.Nil(t, mid) +} + type noopProduceInterceptor struct{} func (noopProduceInterceptor) BeforeSend(producer Producer, message *ProducerMessage) {}