diff --git a/beater/pubsubbeat.go b/beater/pubsubbeat.go index 83c1d5e55..8b157c5c1 100644 --- a/beater/pubsubbeat.go +++ b/beater/pubsubbeat.go @@ -15,21 +15,28 @@ package beater import ( + "context" + "errors" "fmt" + "io/ioutil" + "os" + "path" + "sync" "time" - "github.com/elastic/beats/libbeat/beat" - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/logp" - - "context" - "runtime" "encoding/json" + "github.com/logrhythm/pubsubbeat/crypto" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "cloud.google.com/go/pubsub" "github.com/logrhythm/pubsubbeat/config" + "github.com/logrhythm/pubsubbeat/heartbeat" "google.golang.org/api/option" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -42,8 +49,22 @@ type Pubsubbeat struct { pubsubClient *pubsub.Client subscription *pubsub.Subscription logger *logp.Logger + StopChan chan struct{} } +const ( + cycleTime = 10 //will be in seconds + ServiceName = "pubsubbeat" +) + +var ( + receivedLogsInCycle int64 + counterLock sync.RWMutex + logsReceived int64 +) + +var stopCh = make(chan struct{}) + func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { config, err := config.GetAndValidateConfig(cfg) if err != nil { @@ -74,6 +95,7 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { } func (bt *Pubsubbeat) Run(b *beat.Beat) error { + bt.logger.Info("pubsubbeat is running! Hit CTRL-C to stop it.") var err error @@ -92,8 +114,22 @@ func (bt *Pubsubbeat) Run(b *beat.Beat) error { bt.pubsubClient.Close() }() + // Self-reporting heartbeat + bt.StopChan = make(chan struct{}) + hb := heartbeat.NewHeartbeatConfig(bt.config.HeartbeatInterval, bt.config.HeartbeatDisabled) + heartbeater, err := hb.CreateEnabled(bt.StopChan, ServiceName) + if err != nil { + logp.Info("Error while creating new heartbeat object: %v", err) + } + if heartbeater != nil { + heartbeater.Start(bt.StopChan, bt.client.Publish) + } + + go cycleRoutine(time.Duration(cycleTime)) + err = bt.subscription.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { // This callback is invoked concurrently by multiple goroutines + var datetime time.Time eventMap := common.MapStr{ "type": b.Info.Name, @@ -128,7 +164,7 @@ func (bt *Pubsubbeat) Run(b *beat.Beat) error { } if unmarshalErr != nil { - bt.logger.Warnf("failed to decode json message: %s", unmarshalErr) + bt.logger.Errorf("failed to decode json message: %s", unmarshalErr) if bt.config.Json.AddErrorKey { eventMap["error"] = common.MapStr{ "key": "json", @@ -141,11 +177,16 @@ func (bt *Pubsubbeat) Run(b *beat.Beat) error { if datetime.IsZero() { datetime = time.Now() } + bt.client.Publish(beat.Event{ Timestamp: datetime, Fields: eventMap, }) + counterLock.Lock() + receivedLogsInCycle = receivedLogsInCycle + 1 + counterLock.Unlock() + // TODO: Evaluate using AckHandler. m.Ack() }) @@ -159,6 +200,8 @@ func (bt *Pubsubbeat) Run(b *beat.Beat) error { func (bt *Pubsubbeat) Stop() { bt.client.Close() + close(stopCh) + bt.StopChan <- struct{}{} close(bt.done) } @@ -166,9 +209,26 @@ func createPubsubClient(config *config.Config) (*pubsub.Client, error) { ctx := context.Background() userAgent := fmt.Sprintf( "Elastic/Pubsubbeat (%s; %s)", runtime.GOOS, runtime.GOARCH) + tempFile, err := ioutil.TempFile(path.Dir(config.CredentialsFile), "temp") + if err != nil { + return nil, fmt.Errorf("fail to create temp file for decrypted credentials: %v", err) + } + defer os.Remove(tempFile.Name()) + options := []option.ClientOption{option.WithUserAgent(userAgent)} if config.CredentialsFile != "" { - options = append(options, option.WithCredentialsFile(config.CredentialsFile)) + c, err := ioutil.ReadFile(config.CredentialsFile) // just pass the file name + if err != nil { + return nil, fmt.Errorf("fail to encrypted credentials: %v", err) + } + + decryptedContent, err := crypto.Decrypt(string(c)) + if err != nil { + return nil, errors.New("error decrypting Content") + } + tempFile.WriteString(decryptedContent) + options = append(options, option.WithCredentialsFile(tempFile.Name())) + } client, err := pubsub.NewClient(ctx, config.Project, options...) @@ -193,9 +253,12 @@ func getOrCreateSubscription(client *pubsub.Client, config *config.Config) (*pub RetentionDuration: config.Subscription.RetentionDuration, }) - if st, ok := status.FromError(err); ok && st.Code() == codes.AlreadyExists { + st, ok := status.FromError(err) + if ok && st.Code() == codes.AlreadyExists { // The subscription already exists. subscription = client.Subscription(config.Subscription.Name) + } else if err != nil { + return nil, fmt.Errorf(st.Message()) } else if ok && st.Code() == codes.NotFound { return nil, fmt.Errorf("topic %q does not exists", config.Topic) } else if !ok { @@ -204,3 +267,26 @@ func getOrCreateSubscription(client *pubsub.Client, config *config.Config) (*pub return subscription, nil } + +func cycleRoutine(n time.Duration) { + for { + select { + case <-stopCh: + break + default: + } + + time.Sleep(n * time.Second) + counterLock.Lock() + logsReceived = logsReceived + receivedLogsInCycle + var recordsPerSecond int64 + if receivedLogsInCycle > 0 { + recordsPerSecond = receivedLogsInCycle / int64(cycleTime) + } + logp.Info("Total number of logs received in current cycle : %d", receivedLogsInCycle) + receivedLogsInCycle = 0 + counterLock.Unlock() + logp.Info("Total number of logs received : %d", logsReceived) + logp.Info("Events Flush Rate: %v messages per second", recordsPerSecond) + } +} diff --git a/config/config.go b/config/config.go index 609f2a43a..31ae8c35c 100644 --- a/config/config.go +++ b/config/config.go @@ -22,6 +22,7 @@ import ( "os" "github.com/elastic/beats/libbeat/common" + hb "github.com/logrhythm/pubsubbeat/heartbeat" ) type Config struct { @@ -42,12 +43,16 @@ type Config struct { FieldsTimestampName string `config:"fields_timestamp_name"` FieldsTimestampFormat string `config:"fields_timestamp_format"` } + HeartbeatInterval time.Duration `config:"heartbeatinterval"` + HeartbeatDisabled bool `config:"heartbeatdisabled"` } func GetDefaultConfig() Config { config := Config{} config.Subscription.Create = true config.Json.FieldsTimestampName = "@timestamp" + config.HeartbeatInterval = hb.IntervalValue + config.HeartbeatDisabled = false return config } diff --git a/crypto/BUILD.bazel b/crypto/BUILD.bazel new file mode 100644 index 000000000..6355bda97 --- /dev/null +++ b/crypto/BUILD.bazel @@ -0,0 +1,21 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "decryption.go", + "encryption.go", + ], + importpath = "github.com/logrhythm/siem/internal/pkg/crypto", + visibility = ["//:__subpackages__"], +) + +go_test( + name = "go_default_test", + srcs = [ + "decryption_test.go", + "encryption_test.go", + ], + embed = [":go_default_library"], + deps = ["@com_github_stretchr_testify//assert:go_default_library"], +) diff --git a/crypto/decryption.go b/crypto/decryption.go new file mode 100644 index 000000000..80d3ed7a5 --- /dev/null +++ b/crypto/decryption.go @@ -0,0 +1,64 @@ +package crypto + +import ( + "crypto/aes" + "crypto/cipher" + "encoding/base64" + "errors" + "strconv" + "strings" +) + +// Decrypt function is used to decrypt the string +func Decrypt(securemess string) (decodedmess string, err error) { + if len(strings.TrimSpace(securemess)) == 0 { + return "", errors.New("string is empty") + } + decodedStr := strings.Split(securemess, "||") + if len(decodedStr) == 2 { + ver, err := strconv.Atoi(decodedStr[0]) + if err != nil { + return "", err + } + switch ver { + case encV1: + decodedmess, err = decrypt1(decodedStr[1]) + if err != nil { + return "", err + } + default: + return "", errors.New("invalid encryption") + } + } + + return decodedmess, nil +} + +func decrypt1(securemess string) (string, error) { + cipherText, err := base64.URLEncoding.DecodeString(securemess) + if err != nil { + return "", err + } + + block, err := aes.NewCipher(cipherKey) + if err != nil { + return "", err + } + + if len(cipherText) < aes.BlockSize { + err = errors.New("ciphertext block size is too short") + return "", err + } + + //IV needs to be unique, but doesn't have to be secure. + //It's common to put it at the beginning of the ciphertext. + iv := cipherText[:aes.BlockSize] + cipherText = cipherText[aes.BlockSize:] + + stream := cipher.NewCFBDecrypter(block, iv) + // XORKeyStream can work in-place if the two arguments are the same. + stream.XORKeyStream(cipherText, cipherText) + + decodedmess := string(cipherText) + return decodedmess, nil +} diff --git a/crypto/decryption_test.go b/crypto/decryption_test.go new file mode 100644 index 000000000..a78be2904 --- /dev/null +++ b/crypto/decryption_test.go @@ -0,0 +1,25 @@ +package crypto + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +const TestString = "encryptme" + +func TestDecrypt(t *testing.T) { + t.Run("success decryption", func(t *testing.T) { + enryptedMess, err := Encrypt(TestString) + assert.Nil(t, err) + actual, err := Decrypt(enryptedMess) + assert.Nil(t, err) + assert.Equal(t, TestString, actual) + }) + t.Run("failure decryption", func(t *testing.T) { + str := fmt.Sprintf("%d%s%s", encV1, "||", TestString) + _, err := Decrypt(str) + assert.NotNil(t, err) + }) +} diff --git a/crypto/encryption.go b/crypto/encryption.go new file mode 100644 index 000000000..d7ba04e83 --- /dev/null +++ b/crypto/encryption.go @@ -0,0 +1,47 @@ +package crypto + +import ( + "crypto/aes" + "crypto/cipher" + "crypto/rand" + "encoding/base64" + "errors" + "io" + "strconv" + "strings" +) + +var cipherKey = []byte("0123456789012345") + +const ( + encV1 = 1 +) + +//Encrypt function is used to encrypt the string +func Encrypt(message string) (encmess string, err error) { + if len(strings.TrimSpace(message)) == 0 { + return "", errors.New("string is empty") + } + plainText := []byte(message) + + block, err := aes.NewCipher(cipherKey) + if err != nil { + return "", err + } + + //IV needs to be unique, but doesn't have to be secure. + //It's common to put it at the beginning of the ciphertext. + cipherText := make([]byte, aes.BlockSize+len(plainText)) + iv := cipherText[:aes.BlockSize] + if _, err = io.ReadFull(rand.Reader, iv); err != nil { + return "", err + } + + stream := cipher.NewCFBEncrypter(block, iv) + stream.XORKeyStream(cipherText[aes.BlockSize:], plainText) + + //returns to base64 encoded string + encmess = base64.URLEncoding.EncodeToString(cipherText) + finalEnc := strconv.Itoa(encV1) + "||" + encmess //fmt.Sprintf("%d%s%s", encV1, "||", encmess) + return finalEnc, nil +} diff --git a/crypto/encryption_test.go b/crypto/encryption_test.go new file mode 100644 index 000000000..f6d5e2ea8 --- /dev/null +++ b/crypto/encryption_test.go @@ -0,0 +1,16 @@ +package crypto + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestEncrypt(t *testing.T) { + t.Run("success encryption", func(t *testing.T) { + enryptedMess, err := Encrypt(TestString) + assert.Nil(t, err) + _, err = Decrypt(enryptedMess) + assert.Nil(t, err) + }) +} diff --git a/heartbeat/enable.go b/heartbeat/enable.go new file mode 100644 index 000000000..bfb077562 --- /dev/null +++ b/heartbeat/enable.go @@ -0,0 +1,45 @@ +package heartbeat + +import ( + "fmt" + "time" + + "github.com/elastic/beats/libbeat/logp" +) + +// Config is a structure for heartbeat +type Config struct { + Interval time.Duration + Disabled bool +} + +// IntervalValue is a default value for heartbeat interval +var IntervalValue = 5 * time.Minute + +// NewHeartbeatConfig is a constructor to return the object of heartbeatConfig structure +func NewHeartbeatConfig(interval time.Duration, disabled bool) *Config { + return &Config{ + Interval: interval, + Disabled: disabled, + } +} + +// CreateEnabled will create all miscellaneous components +func (config *Config) CreateEnabled(doneChan chan struct{}, serviceName string) (*StatusBeater, error) { + if config == nil { + return nil, fmt.Errorf("no heartbeat specified. To disable, specify 'disabled: true' in the heartbeat configuration") + } + + if config.Disabled { + // Customer has explicitly disabled heart beating + return nil, nil + } + + if config.Interval <= 0 { + // Shouldn't happen in regular code path because of our defaults / validation + logp.Warn("Heartbeat interval can not be less than zero. Setting to default 5 minute") + config.Interval = IntervalValue + } + + return NewStatusBeater(serviceName, config.Interval, doneChan), nil +} diff --git a/heartbeat/status_beater.go b/heartbeat/status_beater.go new file mode 100644 index 000000000..470029887 --- /dev/null +++ b/heartbeat/status_beater.go @@ -0,0 +1,122 @@ +package heartbeat + +import ( + "encoding/json" + "time" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/golang/protobuf/ptypes/timestamp" +) + +// Heartbeat is a structure for heartbeat +type Heartbeat struct { + // Service name + ServiceName string `json:"service_name"` + // Current version of the service + ServiceVersion string `json:"service_version"` + + Time timestamp.Timestamp `json:"time"` + + Status Status `json:"status"` +} + +const ( + //ServiceStarted is a code for starting a particular service + ServiceStarted = 1 + //ServiceRunning is a code for running instance a particular service + ServiceRunning = 2 + //ServiceStopped is a code for stopping a particular service + ServiceStopped = 3 +) + +// Status is used for status of heartbeat1 +type Status struct { + Code int64 `json:"code"` + Description string `json:"description"` +} + +// IntervalFunc is a function that can trigger a timing event based on a duration +type IntervalFunc func() <-chan time.Time + +// StatusBeater reports simple service information +type StatusBeater struct { + Name string + Version string + + IntervalFunc IntervalFunc + doneChan chan struct{} +} + +// Start will begin reporting heartbeats through the beats +func (sb *StatusBeater) Start(stopChan chan struct{}, publish func(event beat.Event)) { + go func() { + sb.Beat(ServiceStarted, "Service started", publish) + for { + select { + case <-sb.IntervalFunc(): + sb.Beat(ServiceRunning, "Service is Running", publish) + case <-stopChan: + sb.Beat(ServiceStopped, "Service is Stopped", publish) + sb.doneChan <- struct{}{} + return + } + } + }() +} + +// Beat will send a beat containing simple service status information +func (sb *StatusBeater) Beat(status int64, description string, publish func(event beat.Event)) { + now := time.Now().UnixNano() + msg := Heartbeat{ + ServiceName: sb.Name, + ServiceVersion: sb.Version, + Time: timestamp.Timestamp{ + Seconds: now / time.Nanosecond.Nanoseconds(), + }, + Status: Status{ + Code: status, + Description: description, + }, + } + msgJSON, err := json.Marshal(msg) + if err != nil { + logp.Warn("internal heartbeat message json conversion failed %s", err) + return + } + sb.PublishEvent(msgJSON, publish) + +} + +// PublishEvent will publish passed Log +func (sb *StatusBeater) PublishEvent(logData []byte, publish func(event beat.Event)) { + event := beat.Event{ + Timestamp: time.Now(), + Fields: common.MapStr{ + "heartbeat": string(logData), + }, + } + publish(event) + logp.Info("heartbeat sent") +} + +// NewStatusBeater will return a new StatusBeater with the provided base information +func NewStatusBeater(serviceName string, interval time.Duration, doneChan chan struct{}) *StatusBeater { + return NewStatusBeaterWithFunc( + serviceName, + func() <-chan time.Time { + return time.After(interval) + }, + doneChan, + ) +} + +// NewStatusBeaterWithFunc returns a new StatusBeater that uses the provided func as a trigger for sending beats +func NewStatusBeaterWithFunc(serviceName string, intervalFunc IntervalFunc, doneChan chan struct{}) *StatusBeater { + return &StatusBeater{ + Name: serviceName, + IntervalFunc: intervalFunc, + doneChan: doneChan, + } +} diff --git a/pubsubbeat.yml b/pubsubbeat.yml index 14c1e085b..14dc1facd 100644 --- a/pubsubbeat.yml +++ b/pubsubbeat.yml @@ -24,6 +24,12 @@ pubsubbeat: # The Pub/Sub subscription name. subscription.name: my-subscription + #time interval to check heartbeat + heartbeatinterval : 5s + + #flag to check heartbeat status + heartbeatdisabled: false + # Attempt to create the subscription. subscription.create: true # Defaults to true