From cf73bf4cefea24e690643e8b716509b8d8dbcc2c Mon Sep 17 00:00:00 2001 From: Rajan Joshi Date: Fri, 14 Feb 2020 17:45:49 +0530 Subject: [PATCH 01/17] invalid project id handled --- beater/pubsubbeat.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/beater/pubsubbeat.go b/beater/pubsubbeat.go index 9e3956b92..dc6846f60 100644 --- a/beater/pubsubbeat.go +++ b/beater/pubsubbeat.go @@ -196,6 +196,8 @@ func getOrCreateSubscription(client *pubsub.Client, config *config.Config) (*pub if st, ok := status.FromError(err); ok && st.Code() == codes.AlreadyExists { // The subscription already exists. subscription = client.Subscription(config.Subscription.Name) + } else if err != nil { + return nil, fmt.Errorf("project %q does not exists", err) } else if ok && st.Code() == codes.NotFound { return nil, fmt.Errorf("topic %q does not exists", config.Topic) } else if !ok { From 82ae91f71ef1b5f0a8b349bda8bf36080262b08b Mon Sep 17 00:00:00 2001 From: Rajan Joshi Date: Tue, 18 Feb 2020 17:05:34 +0530 Subject: [PATCH 02/17] changes reverted --- beater/pubsubbeat.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/beater/pubsubbeat.go b/beater/pubsubbeat.go index dc6846f60..4d391789d 100644 --- a/beater/pubsubbeat.go +++ b/beater/pubsubbeat.go @@ -196,8 +196,8 @@ func getOrCreateSubscription(client *pubsub.Client, config *config.Config) (*pub if st, ok := status.FromError(err); ok && st.Code() == codes.AlreadyExists { // The subscription already exists. subscription = client.Subscription(config.Subscription.Name) - } else if err != nil { - return nil, fmt.Errorf("project %q does not exists", err) + // } else if err != nil { + // return nil, fmt.Errorf("project %q does not exists", err) } else if ok && st.Code() == codes.NotFound { return nil, fmt.Errorf("topic %q does not exists", config.Topic) } else if !ok { From 5b365af99062bf982ec8d545a3710a2a7a39068f Mon Sep 17 00:00:00 2001 From: Rajan Joshi Date: Tue, 18 Feb 2020 17:14:16 +0530 Subject: [PATCH 03/17] changes added for bugs --- beater/pubsubbeat.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/beater/pubsubbeat.go b/beater/pubsubbeat.go index 4d391789d..dc6846f60 100644 --- a/beater/pubsubbeat.go +++ b/beater/pubsubbeat.go @@ -196,8 +196,8 @@ func getOrCreateSubscription(client *pubsub.Client, config *config.Config) (*pub if st, ok := status.FromError(err); ok && st.Code() == codes.AlreadyExists { // The subscription already exists. subscription = client.Subscription(config.Subscription.Name) - // } else if err != nil { - // return nil, fmt.Errorf("project %q does not exists", err) + } else if err != nil { + return nil, fmt.Errorf("project %q does not exists", err) } else if ok && st.Code() == codes.NotFound { return nil, fmt.Errorf("topic %q does not exists", config.Topic) } else if !ok { From 41f99c69506589c35d90b3e13a0fcc5eb589c049 Mon Sep 17 00:00:00 2001 From: Rajan Joshi Date: Wed, 19 Feb 2020 15:42:59 +0530 Subject: [PATCH 04/17] error message refined --- beater/pubsubbeat.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/beater/pubsubbeat.go b/beater/pubsubbeat.go index dc6846f60..bd55d96da 100644 --- a/beater/pubsubbeat.go +++ b/beater/pubsubbeat.go @@ -192,12 +192,12 @@ func getOrCreateSubscription(client *pubsub.Client, config *config.Config) (*pub RetainAckedMessages: config.Subscription.RetainAckedMessages, RetentionDuration: config.Subscription.RetentionDuration, }) - - if st, ok := status.FromError(err); ok && st.Code() == codes.AlreadyExists { + st, ok := status.FromError(err) + if ok && st.Code() == codes.AlreadyExists { // The subscription already exists. subscription = client.Subscription(config.Subscription.Name) } else if err != nil { - return nil, fmt.Errorf("project %q does not exists", err) + return nil, fmt.Errorf(st.Message()) } else if ok && st.Code() == codes.NotFound { return nil, fmt.Errorf("topic %q does not exists", config.Topic) } else if !ok { From cf94c684705a2ad2f967ffac48ca4e8d86348a20 Mon Sep 17 00:00:00 2001 From: Rajan Joshi Date: Wed, 19 Feb 2020 17:00:23 +0530 Subject: [PATCH 05/17] crypto package integrated --- beater/pubsubbeat.go | 37 ++++++++++++++++++++++++++++++------- crypto/BUILD.bazel | 21 +++++++++++++++++++++ crypto/decryption.go | 38 ++++++++++++++++++++++++++++++++++++++ crypto/decryption_test.go | 23 +++++++++++++++++++++++ crypto/encryption.go | 36 ++++++++++++++++++++++++++++++++++++ crypto/encryption_test.go | 16 ++++++++++++++++ 6 files changed, 164 insertions(+), 7 deletions(-) create mode 100644 crypto/BUILD.bazel create mode 100644 crypto/decryption.go create mode 100644 crypto/decryption_test.go create mode 100644 crypto/encryption.go create mode 100644 crypto/encryption_test.go diff --git a/beater/pubsubbeat.go b/beater/pubsubbeat.go index bd55d96da..f50fd55e9 100644 --- a/beater/pubsubbeat.go +++ b/beater/pubsubbeat.go @@ -15,19 +15,24 @@ package beater import ( + "context" + "errors" "fmt" + "io/ioutil" + "os" + "path" "time" - "github.com/elastic/beats/libbeat/beat" - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/logp" - - "context" - "runtime" "encoding/json" + "github.com/logrhythm/pubsubbeat/crypto" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "cloud.google.com/go/pubsub" "github.com/logrhythm/pubsubbeat/config" "google.golang.org/api/option" @@ -166,15 +171,32 @@ func createPubsubClient(config *config.Config) (*pubsub.Client, error) { ctx := context.Background() userAgent := fmt.Sprintf( "Elastic/Pubsubbeat (%s; %s)", runtime.GOOS, runtime.GOARCH) + tempFile, err := ioutil.TempFile(path.Dir(config.CredentialsFile), "temp") + if err != nil { + return nil, fmt.Errorf("fail to create temp file for decrypted credentials: %v", err) + } options := []option.ClientOption{option.WithUserAgent(userAgent)} if config.CredentialsFile != "" { - options = append(options, option.WithCredentialsFile(config.CredentialsFile)) + + c, err := ioutil.ReadFile(config.CredentialsFile) // just pass the file name + if err != nil { + return nil, fmt.Errorf("fail to encrypted credentials: %v", err) + } + + decryptedContent, err := crypto.Decrypt(string(c)) + if err != nil { + return nil, errors.New("error decrypting Content") + } + tempFile.WriteString(decryptedContent) + options = append(options, option.WithCredentialsFile(tempFile.Name())) + } client, err := pubsub.NewClient(ctx, config.Project, options...) if err != nil { return nil, fmt.Errorf("fail to create pubsub client: %v", err) } + os.Remove(tempFile.Name()) return client, nil } @@ -192,6 +214,7 @@ func getOrCreateSubscription(client *pubsub.Client, config *config.Config) (*pub RetainAckedMessages: config.Subscription.RetainAckedMessages, RetentionDuration: config.Subscription.RetentionDuration, }) + st, ok := status.FromError(err) if ok && st.Code() == codes.AlreadyExists { // The subscription already exists. diff --git a/crypto/BUILD.bazel b/crypto/BUILD.bazel new file mode 100644 index 000000000..6355bda97 --- /dev/null +++ b/crypto/BUILD.bazel @@ -0,0 +1,21 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "decryption.go", + "encryption.go", + ], + importpath = "github.com/logrhythm/siem/internal/pkg/crypto", + visibility = ["//:__subpackages__"], +) + +go_test( + name = "go_default_test", + srcs = [ + "decryption_test.go", + "encryption_test.go", + ], + embed = [":go_default_library"], + deps = ["@com_github_stretchr_testify//assert:go_default_library"], +) diff --git a/crypto/decryption.go b/crypto/decryption.go new file mode 100644 index 000000000..cf6aae853 --- /dev/null +++ b/crypto/decryption.go @@ -0,0 +1,38 @@ +package crypto + +import ( + "crypto/aes" + "crypto/cipher" + "encoding/base64" + "errors" +) + +// Decrypt function is used to decrypt the string +func Decrypt(securemess string) (decodedmess string, err error) { + cipherText, err := base64.URLEncoding.DecodeString(securemess) + if err != nil { + return "", err + } + + block, err := aes.NewCipher(cipherKey) + if err != nil { + return "", err + } + + if len(cipherText) < aes.BlockSize { + err = errors.New("ciphertext block size is too short") + return "", err + } + + //IV needs to be unique, but doesn't have to be secure. + //It's common to put it at the beginning of the ciphertext. + iv := cipherText[:aes.BlockSize] + cipherText = cipherText[aes.BlockSize:] + + stream := cipher.NewCFBDecrypter(block, iv) + // XORKeyStream can work in-place if the two arguments are the same. + stream.XORKeyStream(cipherText, cipherText) + + decodedmess = string(cipherText) + return decodedmess, nil +} diff --git a/crypto/decryption_test.go b/crypto/decryption_test.go new file mode 100644 index 000000000..7c9994d62 --- /dev/null +++ b/crypto/decryption_test.go @@ -0,0 +1,23 @@ +package crypto + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +const TestString = "encryptme" + +func TestDecrypt(t *testing.T) { + t.Run("success decryption", func(t *testing.T) { + enryptedMess, err := Encrypt(TestString) + assert.Nil(t, err) + actual, err := Decrypt(enryptedMess) + assert.Nil(t, err) + assert.Equal(t, TestString, actual) + }) + t.Run("failure decryption", func(t *testing.T) { + _, err := Decrypt(TestString) + assert.NotNil(t, err) + }) +} diff --git a/crypto/encryption.go b/crypto/encryption.go new file mode 100644 index 000000000..f26069cbf --- /dev/null +++ b/crypto/encryption.go @@ -0,0 +1,36 @@ +package crypto + +import ( + "crypto/aes" + "crypto/cipher" + "crypto/rand" + "encoding/base64" + "io" +) + +var cipherKey = []byte("0123456789012345") + +//Encrypt function is used to encrypt the string +func Encrypt(message string) (encmess string, err error) { + plainText := []byte(message) + + block, err := aes.NewCipher(cipherKey) + if err != nil { + return "", err + } + + //IV needs to be unique, but doesn't have to be secure. + //It's common to put it at the beginning of the ciphertext. + cipherText := make([]byte, aes.BlockSize+len(plainText)) + iv := cipherText[:aes.BlockSize] + if _, err = io.ReadFull(rand.Reader, iv); err != nil { + return "", err + } + + stream := cipher.NewCFBEncrypter(block, iv) + stream.XORKeyStream(cipherText[aes.BlockSize:], plainText) + + //returns to base64 encoded string + encmess = base64.URLEncoding.EncodeToString(cipherText) + return encmess, nil +} diff --git a/crypto/encryption_test.go b/crypto/encryption_test.go new file mode 100644 index 000000000..f6d5e2ea8 --- /dev/null +++ b/crypto/encryption_test.go @@ -0,0 +1,16 @@ +package crypto + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestEncrypt(t *testing.T) { + t.Run("success encryption", func(t *testing.T) { + enryptedMess, err := Encrypt(TestString) + assert.Nil(t, err) + _, err = Decrypt(enryptedMess) + assert.Nil(t, err) + }) +} From e5fd5dab99bb7c4b7de3a9fe2d4a3c90e5720a85 Mon Sep 17 00:00:00 2001 From: Rajan Joshi Date: Thu, 20 Feb 2020 20:32:18 +0530 Subject: [PATCH 06/17] crypto package updated --- crypto/decryption.go | 28 +++++++++++++++++++++++++++- crypto/decryption_test.go | 4 +++- crypto/encryption.go | 13 ++++++++++++- 3 files changed, 42 insertions(+), 3 deletions(-) diff --git a/crypto/decryption.go b/crypto/decryption.go index cf6aae853..10409708d 100644 --- a/crypto/decryption.go +++ b/crypto/decryption.go @@ -5,10 +5,36 @@ import ( "crypto/cipher" "encoding/base64" "errors" + "strconv" + "strings" ) // Decrypt function is used to decrypt the string func Decrypt(securemess string) (decodedmess string, err error) { + if len(strings.TrimSpace(securemess)) == 0 { + return "", errors.New("string is empty") + } + decodedStr := strings.Split(securemess, strconv.Itoa(encV1)+"||") + if len(decodedStr) == 2 { + ver, err := strconv.Atoi(decodedStr[0]) + if err != nil { + return "", err + } + switch ver { + case encV1: + decodedmess, err = decrypt1(decodedStr[1]) + if err != nil { + return "", err + } + default: + return "", errors.New("invalid encryption") + } + } + + return decodedmess, nil +} + +func decrypt1(securemess string) (string, error) { cipherText, err := base64.URLEncoding.DecodeString(securemess) if err != nil { return "", err @@ -33,6 +59,6 @@ func Decrypt(securemess string) (decodedmess string, err error) { // XORKeyStream can work in-place if the two arguments are the same. stream.XORKeyStream(cipherText, cipherText) - decodedmess = string(cipherText) + decodedmess := string(cipherText) return decodedmess, nil } diff --git a/crypto/decryption_test.go b/crypto/decryption_test.go index 7c9994d62..a78be2904 100644 --- a/crypto/decryption_test.go +++ b/crypto/decryption_test.go @@ -1,6 +1,7 @@ package crypto import ( + "fmt" "testing" "github.com/stretchr/testify/assert" @@ -17,7 +18,8 @@ func TestDecrypt(t *testing.T) { assert.Equal(t, TestString, actual) }) t.Run("failure decryption", func(t *testing.T) { - _, err := Decrypt(TestString) + str := fmt.Sprintf("%d%s%s", encV1, "||", TestString) + _, err := Decrypt(str) assert.NotNil(t, err) }) } diff --git a/crypto/encryption.go b/crypto/encryption.go index f26069cbf..d7ba04e83 100644 --- a/crypto/encryption.go +++ b/crypto/encryption.go @@ -5,13 +5,23 @@ import ( "crypto/cipher" "crypto/rand" "encoding/base64" + "errors" "io" + "strconv" + "strings" ) var cipherKey = []byte("0123456789012345") +const ( + encV1 = 1 +) + //Encrypt function is used to encrypt the string func Encrypt(message string) (encmess string, err error) { + if len(strings.TrimSpace(message)) == 0 { + return "", errors.New("string is empty") + } plainText := []byte(message) block, err := aes.NewCipher(cipherKey) @@ -32,5 +42,6 @@ func Encrypt(message string) (encmess string, err error) { //returns to base64 encoded string encmess = base64.URLEncoding.EncodeToString(cipherText) - return encmess, nil + finalEnc := strconv.Itoa(encV1) + "||" + encmess //fmt.Sprintf("%d%s%s", encV1, "||", encmess) + return finalEnc, nil } From 62185d1c5381534fbbbbcd05eccbcb02a46da6bb Mon Sep 17 00:00:00 2001 From: Rajan Joshi Date: Thu, 20 Feb 2020 21:20:58 +0530 Subject: [PATCH 07/17] decrypt function updated --- crypto/decryption.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crypto/decryption.go b/crypto/decryption.go index 10409708d..80d3ed7a5 100644 --- a/crypto/decryption.go +++ b/crypto/decryption.go @@ -14,7 +14,7 @@ func Decrypt(securemess string) (decodedmess string, err error) { if len(strings.TrimSpace(securemess)) == 0 { return "", errors.New("string is empty") } - decodedStr := strings.Split(securemess, strconv.Itoa(encV1)+"||") + decodedStr := strings.Split(securemess, "||") if len(decodedStr) == 2 { ver, err := strconv.Atoi(decodedStr[0]) if err != nil { From 7bcff975abe4124592ea9586bd9a388441f3199a Mon Sep 17 00:00:00 2001 From: Rajan Joshi Date: Fri, 21 Feb 2020 07:47:56 +0530 Subject: [PATCH 08/17] log message category updated --- beater/pubsubbeat.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/beater/pubsubbeat.go b/beater/pubsubbeat.go index f50fd55e9..3de48d6bc 100644 --- a/beater/pubsubbeat.go +++ b/beater/pubsubbeat.go @@ -133,7 +133,7 @@ func (bt *Pubsubbeat) Run(b *beat.Beat) error { } if unmarshalErr != nil { - bt.logger.Warnf("failed to decode json message: %s", unmarshalErr) + bt.logger.Errorf("failed to decode json message: %s", unmarshalErr) if bt.config.Json.AddErrorKey { eventMap["error"] = common.MapStr{ "key": "json", @@ -177,8 +177,8 @@ func createPubsubClient(config *config.Config) (*pubsub.Client, error) { } options := []option.ClientOption{option.WithUserAgent(userAgent)} if config.CredentialsFile != "" { - c, err := ioutil.ReadFile(config.CredentialsFile) // just pass the file name + fmt.Println("Error : ", err) if err != nil { return nil, fmt.Errorf("fail to encrypted credentials: %v", err) } @@ -187,6 +187,7 @@ func createPubsubClient(config *config.Config) (*pubsub.Client, error) { if err != nil { return nil, errors.New("error decrypting Content") } + fmt.Println("Data : ", decryptedContent) tempFile.WriteString(decryptedContent) options = append(options, option.WithCredentialsFile(tempFile.Name())) From d9b884ae9d1946e8a1e681ab347475fc3cd1d031 Mon Sep 17 00:00:00 2001 From: Rajan Joshi Date: Fri, 21 Feb 2020 10:19:16 +0530 Subject: [PATCH 09/17] commented code removed --- beater/pubsubbeat.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/beater/pubsubbeat.go b/beater/pubsubbeat.go index 3de48d6bc..82cc99d4d 100644 --- a/beater/pubsubbeat.go +++ b/beater/pubsubbeat.go @@ -178,7 +178,6 @@ func createPubsubClient(config *config.Config) (*pubsub.Client, error) { options := []option.ClientOption{option.WithUserAgent(userAgent)} if config.CredentialsFile != "" { c, err := ioutil.ReadFile(config.CredentialsFile) // just pass the file name - fmt.Println("Error : ", err) if err != nil { return nil, fmt.Errorf("fail to encrypted credentials: %v", err) } @@ -187,7 +186,6 @@ func createPubsubClient(config *config.Config) (*pubsub.Client, error) { if err != nil { return nil, errors.New("error decrypting Content") } - fmt.Println("Data : ", decryptedContent) tempFile.WriteString(decryptedContent) options = append(options, option.WithCredentialsFile(tempFile.Name())) From 8a8e6334e6907c4d8f64ac5c6ee04397bf191294 Mon Sep 17 00:00:00 2001 From: Rajan Joshi Date: Mon, 2 Mar 2020 11:46:25 +0530 Subject: [PATCH 10/17] file removal done --- beater/pubsubbeat.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/beater/pubsubbeat.go b/beater/pubsubbeat.go index 82cc99d4d..7f5046475 100644 --- a/beater/pubsubbeat.go +++ b/beater/pubsubbeat.go @@ -175,6 +175,8 @@ func createPubsubClient(config *config.Config) (*pubsub.Client, error) { if err != nil { return nil, fmt.Errorf("fail to create temp file for decrypted credentials: %v", err) } + defer os.Remove(tempFile.Name()) + options := []option.ClientOption{option.WithUserAgent(userAgent)} if config.CredentialsFile != "" { c, err := ioutil.ReadFile(config.CredentialsFile) // just pass the file name @@ -195,7 +197,6 @@ func createPubsubClient(config *config.Config) (*pubsub.Client, error) { if err != nil { return nil, fmt.Errorf("fail to create pubsub client: %v", err) } - os.Remove(tempFile.Name()) return client, nil } From a5c1df9f10c45ca1712eb32c2405b332925496e9 Mon Sep 17 00:00:00 2001 From: Rajan Joshi Date: Fri, 20 Mar 2020 01:34:33 +0530 Subject: [PATCH 11/17] log count added --- beater/pubsubbeat.go | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/beater/pubsubbeat.go b/beater/pubsubbeat.go index 7f5046475..990cfdc0f 100644 --- a/beater/pubsubbeat.go +++ b/beater/pubsubbeat.go @@ -21,6 +21,7 @@ import ( "io/ioutil" "os" "path" + "sync" "time" "runtime" @@ -49,6 +50,11 @@ type Pubsubbeat struct { logger *logp.Logger } +var recordsReceivedInCycle int64 +var cycleTime int64 = 10 //will be in seconds +var counterLock sync.RWMutex +var receivedLogs int64 + func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { config, err := config.GetAndValidateConfig(cfg) if err != nil { @@ -79,6 +85,11 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { } func (bt *Pubsubbeat) Run(b *beat.Beat) error { + + // var receivedEventsLogCount int64 + // var allEventsLogs int64 + ch := make(chan int64, 1) + bt.logger.Info("pubsubbeat is running! Hit CTRL-C to stop it.") var err error @@ -97,8 +108,11 @@ func (bt *Pubsubbeat) Run(b *beat.Beat) error { bt.pubsubClient.Close() }() + go cycleRoutine(time.Duration(cycleTime), ch) + err = bt.subscription.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { // This callback is invoked concurrently by multiple goroutines + var datetime time.Time eventMap := common.MapStr{ "type": b.Info.Name, @@ -146,6 +160,15 @@ func (bt *Pubsubbeat) Run(b *beat.Beat) error { if datetime.IsZero() { datetime = time.Now() } + + receivedLogs = receivedLogs + 1 + counterLock.Lock() + recordsReceivedInCycle = receivedLogs + go func() { + ch <- receivedLogs //recordsReceivedInCycle + }() + counterLock.Unlock() + bt.client.Publish(beat.Event{ Timestamp: datetime, Fields: eventMap, @@ -229,3 +252,19 @@ func getOrCreateSubscription(client *pubsub.Client, config *config.Config) (*pub return subscription, nil } + +func cycleRoutine(n time.Duration, ch chan int64) { + for { + logsReceived := <-ch + time.Sleep(n * time.Second) + counterLock.Lock() + var recordsPerSecond int64 + if logsReceived > 0 { + recordsPerSecond = logsReceived / int64(cycleTime) + } + + logp.Info("Total number of logs received : %d", logsReceived) + logp.Info("Events Flush Rate: %v per second", recordsPerSecond) + counterLock.Unlock() + } +} From cb4e343619f5bedc460809ba467ab29b21dbc340 Mon Sep 17 00:00:00 2001 From: Rajan Joshi Date: Fri, 20 Mar 2020 01:55:04 +0530 Subject: [PATCH 12/17] code optimized --- beater/pubsubbeat.go | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/beater/pubsubbeat.go b/beater/pubsubbeat.go index 990cfdc0f..2ccb0eafc 100644 --- a/beater/pubsubbeat.go +++ b/beater/pubsubbeat.go @@ -21,7 +21,6 @@ import ( "io/ioutil" "os" "path" - "sync" "time" "runtime" @@ -50,9 +49,7 @@ type Pubsubbeat struct { logger *logp.Logger } -var recordsReceivedInCycle int64 var cycleTime int64 = 10 //will be in seconds -var counterLock sync.RWMutex var receivedLogs int64 func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { @@ -86,8 +83,6 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { func (bt *Pubsubbeat) Run(b *beat.Beat) error { - // var receivedEventsLogCount int64 - // var allEventsLogs int64 ch := make(chan int64, 1) bt.logger.Info("pubsubbeat is running! Hit CTRL-C to stop it.") @@ -162,12 +157,9 @@ func (bt *Pubsubbeat) Run(b *beat.Beat) error { } receivedLogs = receivedLogs + 1 - counterLock.Lock() - recordsReceivedInCycle = receivedLogs go func() { - ch <- receivedLogs //recordsReceivedInCycle + ch <- receivedLogs }() - counterLock.Unlock() bt.client.Publish(beat.Event{ Timestamp: datetime, @@ -257,7 +249,6 @@ func cycleRoutine(n time.Duration, ch chan int64) { for { logsReceived := <-ch time.Sleep(n * time.Second) - counterLock.Lock() var recordsPerSecond int64 if logsReceived > 0 { recordsPerSecond = logsReceived / int64(cycleTime) @@ -265,6 +256,5 @@ func cycleRoutine(n time.Duration, ch chan int64) { logp.Info("Total number of logs received : %d", logsReceived) logp.Info("Events Flush Rate: %v per second", recordsPerSecond) - counterLock.Unlock() } } From c23a4b4ba70d4297e83213f0b7d30d52a639866a Mon Sep 17 00:00:00 2001 From: Rajan Joshi Date: Fri, 20 Mar 2020 18:14:50 +0530 Subject: [PATCH 13/17] suggestions incorporated --- beater/pubsubbeat.go | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/beater/pubsubbeat.go b/beater/pubsubbeat.go index 2ccb0eafc..e11478679 100644 --- a/beater/pubsubbeat.go +++ b/beater/pubsubbeat.go @@ -21,6 +21,7 @@ import ( "io/ioutil" "os" "path" + "sync" "time" "runtime" @@ -51,6 +52,8 @@ type Pubsubbeat struct { var cycleTime int64 = 10 //will be in seconds var receivedLogs int64 +var counterLock sync.RWMutex +var ch = make(chan bool, 1) func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { config, err := config.GetAndValidateConfig(cfg) @@ -83,8 +86,6 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { func (bt *Pubsubbeat) Run(b *beat.Beat) error { - ch := make(chan int64, 1) - bt.logger.Info("pubsubbeat is running! Hit CTRL-C to stop it.") var err error @@ -103,7 +104,7 @@ func (bt *Pubsubbeat) Run(b *beat.Beat) error { bt.pubsubClient.Close() }() - go cycleRoutine(time.Duration(cycleTime), ch) + go cycleRoutine(time.Duration(cycleTime)) err = bt.subscription.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { // This callback is invoked concurrently by multiple goroutines @@ -156,16 +157,15 @@ func (bt *Pubsubbeat) Run(b *beat.Beat) error { datetime = time.Now() } - receivedLogs = receivedLogs + 1 - go func() { - ch <- receivedLogs - }() - bt.client.Publish(beat.Event{ Timestamp: datetime, Fields: eventMap, }) + counterLock.Lock() + receivedLogs = receivedLogs + 1 + counterLock.Unlock() + // TODO: Evaluate using AckHandler. m.Ack() }) @@ -179,6 +179,7 @@ func (bt *Pubsubbeat) Run(b *beat.Beat) error { func (bt *Pubsubbeat) Stop() { bt.client.Close() + ch <- true close(bt.done) } @@ -245,15 +246,23 @@ func getOrCreateSubscription(client *pubsub.Client, config *config.Config) (*pub return subscription, nil } -func cycleRoutine(n time.Duration, ch chan int64) { +func cycleRoutine(n time.Duration) { for { - logsReceived := <-ch + select { + case <-ch: + break + default: + } + time.Sleep(n * time.Second) + counterLock.Lock() + logsReceived := receivedLogs var recordsPerSecond int64 if logsReceived > 0 { recordsPerSecond = logsReceived / int64(cycleTime) } - + receivedLogs = 0 + counterLock.Unlock() logp.Info("Total number of logs received : %d", logsReceived) logp.Info("Events Flush Rate: %v per second", recordsPerSecond) } From a5e1c8fbd353b020abeef147ff835c0370ef9fa8 Mon Sep 17 00:00:00 2001 From: Rajan Joshi Date: Fri, 20 Mar 2020 23:09:00 +0530 Subject: [PATCH 14/17] mps done --- beater/pubsubbeat.go | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/beater/pubsubbeat.go b/beater/pubsubbeat.go index e11478679..a4e5a89ea 100644 --- a/beater/pubsubbeat.go +++ b/beater/pubsubbeat.go @@ -50,10 +50,15 @@ type Pubsubbeat struct { logger *logp.Logger } -var cycleTime int64 = 10 //will be in seconds -var receivedLogs int64 -var counterLock sync.RWMutex -var ch = make(chan bool, 1) +const cycleTime = 10 //will be in seconds + +var ( + receivedLogsInCycle int64 + counterLock sync.RWMutex + logsReceived int64 +) + +var stopCh = make(chan struct{}) func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { config, err := config.GetAndValidateConfig(cfg) @@ -163,7 +168,7 @@ func (bt *Pubsubbeat) Run(b *beat.Beat) error { }) counterLock.Lock() - receivedLogs = receivedLogs + 1 + receivedLogsInCycle = receivedLogsInCycle + 1 counterLock.Unlock() // TODO: Evaluate using AckHandler. @@ -179,7 +184,7 @@ func (bt *Pubsubbeat) Run(b *beat.Beat) error { func (bt *Pubsubbeat) Stop() { bt.client.Close() - ch <- true + close(stopCh) close(bt.done) } @@ -249,21 +254,22 @@ func getOrCreateSubscription(client *pubsub.Client, config *config.Config) (*pub func cycleRoutine(n time.Duration) { for { select { - case <-ch: + case <-stopCh: break default: } time.Sleep(n * time.Second) counterLock.Lock() - logsReceived := receivedLogs + logsReceived = logsReceived + receivedLogsInCycle var recordsPerSecond int64 - if logsReceived > 0 { - recordsPerSecond = logsReceived / int64(cycleTime) + if receivedLogsInCycle > 0 { + recordsPerSecond = receivedLogsInCycle / int64(cycleTime) } - receivedLogs = 0 + logp.Info("Total number of logs received in current cycle : %d", receivedLogsInCycle) + receivedLogsInCycle = 0 counterLock.Unlock() logp.Info("Total number of logs received : %d", logsReceived) - logp.Info("Events Flush Rate: %v per second", recordsPerSecond) + logp.Info("Events Flush Rate: %v messages per second", recordsPerSecond) } } From abc9145905e158a8a9e3a81f2fa5927769e339cf Mon Sep 17 00:00:00 2001 From: Rajan Joshi Date: Sat, 21 Mar 2020 21:35:45 +0530 Subject: [PATCH 15/17] heartbeat added --- beater/pubsubbeat.go | 19 ++++- config/config.go | 5 ++ heartbeat/enable.go | 45 ++++++++++++ heartbeat/status_beater.go | 124 ++++++++++++++++++++++++++++++++ lrlog/lrlog.go | 143 +++++++++++++++++++++++++++++++++++++ lrlog/tests/lrlog_test.go | 128 +++++++++++++++++++++++++++++++++ pubsubbeat.yml | 6 ++ 7 files changed, 469 insertions(+), 1 deletion(-) create mode 100644 heartbeat/enable.go create mode 100644 heartbeat/status_beater.go create mode 100644 lrlog/lrlog.go create mode 100644 lrlog/tests/lrlog_test.go diff --git a/beater/pubsubbeat.go b/beater/pubsubbeat.go index a4e5a89ea..87f8d66ba 100644 --- a/beater/pubsubbeat.go +++ b/beater/pubsubbeat.go @@ -36,6 +36,7 @@ import ( "cloud.google.com/go/pubsub" "github.com/logrhythm/pubsubbeat/config" + "github.com/logrhythm/pubsubbeat/heartbeat" "google.golang.org/api/option" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -48,9 +49,13 @@ type Pubsubbeat struct { pubsubClient *pubsub.Client subscription *pubsub.Subscription logger *logp.Logger + StopChan chan struct{} } -const cycleTime = 10 //will be in seconds +const ( + cycleTime = 10 //will be in seconds + ServiceName = "pubsubbeat" +) var ( receivedLogsInCycle int64 @@ -109,6 +114,17 @@ func (bt *Pubsubbeat) Run(b *beat.Beat) error { bt.pubsubClient.Close() }() + // Self-reporting heartbeat + bt.StopChan = make(chan struct{}) + hb := heartbeat.NewHeartbeatConfig(bt.config.HeartbeatInterval, bt.config.HeartbeatDisabled) + heartbeater, err := hb.CreateEnabled(bt.StopChan, ServiceName) + if err != nil { + logp.Info("Error while creating new heartbeat object: %v", err) + } + if heartbeater != nil { + heartbeater.Start(bt.StopChan, bt.client.Publish) + } + go cycleRoutine(time.Duration(cycleTime)) err = bt.subscription.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { @@ -185,6 +201,7 @@ func (bt *Pubsubbeat) Run(b *beat.Beat) error { func (bt *Pubsubbeat) Stop() { bt.client.Close() close(stopCh) + bt.StopChan <- struct{}{} close(bt.done) } diff --git a/config/config.go b/config/config.go index 609f2a43a..31ae8c35c 100644 --- a/config/config.go +++ b/config/config.go @@ -22,6 +22,7 @@ import ( "os" "github.com/elastic/beats/libbeat/common" + hb "github.com/logrhythm/pubsubbeat/heartbeat" ) type Config struct { @@ -42,12 +43,16 @@ type Config struct { FieldsTimestampName string `config:"fields_timestamp_name"` FieldsTimestampFormat string `config:"fields_timestamp_format"` } + HeartbeatInterval time.Duration `config:"heartbeatinterval"` + HeartbeatDisabled bool `config:"heartbeatdisabled"` } func GetDefaultConfig() Config { config := Config{} config.Subscription.Create = true config.Json.FieldsTimestampName = "@timestamp" + config.HeartbeatInterval = hb.IntervalValue + config.HeartbeatDisabled = false return config } diff --git a/heartbeat/enable.go b/heartbeat/enable.go new file mode 100644 index 000000000..3da0d4a82 --- /dev/null +++ b/heartbeat/enable.go @@ -0,0 +1,45 @@ +package heartbeat + +import ( + "fmt" + "time" + + "github.com/elastic/beats/libbeat/logp" +) + +// Config is a structure for heartbeat +type Config struct { + Interval time.Duration + Disabled bool +} + +// IntervalValue is a default value for heartbeat interval +var IntervalValue = 5 * time.Minute + +// NewHeartbeatConfig is a constructor to return the object of heartbeatConfig structure +func NewHeartbeatConfig(interval time.Duration, disabled bool) *Config { + return &Config{ + Interval: interval, + Disabled: disabled, + } +} + +// CreateEnabled will create all miscellaneous components +func (config *Config) CreateEnabled(doneChan chan struct{}, serviceName string) (*StatusBeater, error) { + if config == nil { + return nil, fmt.Errorf("no heartbeat specified. To disable, specify 'disabled: true' in the heartbeat configuration") + } + + if config.Disabled { + // Customer has explicitly disabled heart beating + return nil, nil + } + + if config.Interval <= 0 { + // Shouldn't happen in regular code path because of our defaults / validation + logp.Warn("invalid heartbeat duration specified. duration must be greater than zero") + config.Interval = IntervalValue + } + + return NewStatusBeater(serviceName, config.Interval, doneChan), nil +} diff --git a/heartbeat/status_beater.go b/heartbeat/status_beater.go new file mode 100644 index 000000000..99b0c0575 --- /dev/null +++ b/heartbeat/status_beater.go @@ -0,0 +1,124 @@ +package heartbeat + +import ( + "encoding/json" + "time" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/golang/protobuf/ptypes/timestamp" + "github.com/logrhythm/sophoscentralbeat/lrlog" + "github.com/pkg/errors" +) + +// Heartbeat is a structure for heartbeat +type Heartbeat struct { + // Service name + ServiceName string `json:"service_name"` + // Current version of the service + ServiceVersion string `json:"service_version"` + + Time timestamp.Timestamp `json:"time"` + + Status Status `json:"status"` +} + +const ( + //ServiceStarted is a code for starting a particular service + ServiceStarted = 1 + //ServiceRunning is a code for running instance a particular service + ServiceRunning = 2 + //ServiceStopped is a code for stopping a particular service + ServiceStopped = 3 +) + +// Status is used for status of heartbeat1 +type Status struct { + Code int64 `json:"code"` + Description string `json:"description"` +} + +// IntervalFunc is a function that can trigger a timing event based on a duration +type IntervalFunc func() <-chan time.Time + +// StatusBeater reports simple service information +type StatusBeater struct { + Name string + Version string + + IntervalFunc IntervalFunc + doneChan chan struct{} +} + +// Start will begin reporting heartbeats through the beats +func (sb *StatusBeater) Start(stopChan chan struct{}, publish func(event beat.Event)) { + go func() { + sb.Beat(ServiceStarted, "Service started", publish) + for { + select { + case <-sb.IntervalFunc(): + sb.Beat(ServiceRunning, "Service is Running", publish) + case <-stopChan: + sb.Beat(ServiceStopped, "Service is Stopped", publish) + sb.doneChan <- struct{}{} + return + } + } + }() +} + +// Beat will send a beat containing simple service status information +func (sb *StatusBeater) Beat(status int64, description string, publish func(event beat.Event)) { + now := time.Now().UnixNano() + msg := Heartbeat{ + ServiceName: sb.Name, + ServiceVersion: sb.Version, + Time: timestamp.Timestamp{ + Seconds: now / time.Nanosecond.Nanoseconds(), + }, + Status: Status{ + Code: status, + Description: description, + }, + } + msgJSON, err := json.Marshal(msg) + if err != nil { + lrlog.Warning(errors.Wrap(err, "internal heartbeat message json conversion failed")) + return + } + sb.PublishEvent(msgJSON, publish) + +} + +// PublishEvent will publish passed Log +func (sb *StatusBeater) PublishEvent(logData []byte, publish func(event beat.Event)) { + event := beat.Event{ + Timestamp: time.Now(), + Fields: common.MapStr{ + "heartbeat": string(logData), + }, + } + publish(event) + logp.Info("heartbeat sent") +} + +// NewStatusBeater will return a new StatusBeater with the provided base information +func NewStatusBeater(serviceName string, interval time.Duration, doneChan chan struct{}) *StatusBeater { + return NewStatusBeaterWithFunc( + serviceName, + func() <-chan time.Time { + return time.After(interval) + }, + doneChan, + ) +} + +// NewStatusBeaterWithFunc returns a new StatusBeater that uses the provided func as a trigger for sending beats +func NewStatusBeaterWithFunc(serviceName string, intervalFunc IntervalFunc, doneChan chan struct{}) *StatusBeater { + return &StatusBeater{ + Name: serviceName, + IntervalFunc: intervalFunc, + doneChan: doneChan, + } +} diff --git a/lrlog/lrlog.go b/lrlog/lrlog.go new file mode 100644 index 000000000..92087992a --- /dev/null +++ b/lrlog/lrlog.go @@ -0,0 +1,143 @@ +package lrlog + +import ( + "fmt" + "log" + "os" + "time" +) + +const ( + infoTagPrefix = "[INFO] " + warningTagPrefix = "[WARNING] " + errorTagPrefix = "[ERROR] " + fatalTagPrefix = "[FATAL] " +) + +var verbosity int + +func init() { + SetLogWriter() +} + +type logWriter struct{} + +func (writer logWriter) Write(bytes []byte) (int, error) { + return fmt.Fprintf(os.Stderr, "%s %s\n", time.Now().UTC().Format("2006-01-02T15:04:05.999Z"), string(bytes)) +} + +// SetLogWriter sets the logging output format +func SetLogWriter() { + log.SetOutput(new(logWriter)) +} + +// SetVerbosity sets the global verbosity level +func SetVerbosity(level int) { + verbosity = level +} + +// Verbose is a boolean alias that allows for convenient inline logging +type Verbose bool + +// V returns true if the logging is set to `level` or higher, false otherwise +func V(level int) Verbose { + return Verbose(verbosity >= level) +} + +// Info is just a call to Info wrapped in a boolean +func (v Verbose) Info(msg string) { + if v { + Info(msg) + } +} + +// Infof is just a call to Infof wrapped in a boolean +func (v Verbose) Infof(msg string, objs ...interface{}) { + if v { + Infof(msg, objs...) + } +} + +// Warning is just a call to Warning wrapped in a boolean +func (v Verbose) Warning(msg string) { + if v { + Warning(msg) + } +} + +// Warningf is just a call to Warningf wrapped in a boolean +func (v Verbose) Warningf(msg string, objs ...interface{}) { + if v { + Warningf(msg, objs...) + } +} + +// Error is just a call to Error wrapped in a boolean +func (v Verbose) Error(msg string) { + if v { + Error(msg) + } +} + +// Errorf is just a call to Errorf wrapped in a boolean +func (v Verbose) Errorf(msg string, objs ...interface{}) { + if v { + Errorf(msg, objs...) + } +} + +// Fatal is just a call to Fatal wrapped in a boolean +func (v Verbose) Fatal(msg string) { + if v { + Fatal(msg) + } +} + +// Fatalf is just a call to Fatalf wrapped in a boolean +func (v Verbose) Fatalf(msg string, objs ...interface{}) { + if v { + Fatalf(msg, objs...) + } +} + +// Info logs args at info level +func Info(args ...interface{}) { + log.Print(infoTagPrefix, args) +} + +// Infof logs a formatted info message with args +func Infof(format string, args ...interface{}) { + log.Printf(infoTagPrefix+format, args...) +} + +// Warning logs args at warning level +func Warning(args ...interface{}) { + log.Print(warningTagPrefix, args) +} + +// Warningf logs a formatted warning message with args +func Warningf(format string, args ...interface{}) { + log.Printf(warningTagPrefix+format, args...) +} + +// Error logs args at error level +func Error(args ...interface{}) { + log.Print(errorTagPrefix, args) +} + +// Errorf logs a formatted error message with args +func Errorf(format string, args ...interface{}) { + log.Printf(errorTagPrefix+format, args...) +} + +// Fatal logs args at fatal level +func Fatal(objs ...interface{}) { + log.Print(fatalTagPrefix, objs) + os.Exit(1) +} + +// Fatalf logs a formatted fatal message with args +func Fatalf(format string, args ...interface{}) { + log.Printf(fatalTagPrefix+format, args...) + os.Exit(1) +} diff --git a/lrlog/tests/lrlog_test.go b/lrlog/tests/lrlog_test.go new file mode 100644 index 000000000..5299cf9ed --- /dev/null +++ b/lrlog/tests/lrlog_test.go @@ -0,0 +1,128 @@ +package tests + +import ( + "testing" + + "github.com/logrhythm/siem/internal/pkg/lrlog" + "github.com/logrhythm/siem/internal/pkg/testing/capture" + "github.com/stretchr/testify/assert" +) + +func TestLogLevels(t *testing.T) { + t.Run("test info", func(t *testing.T) { + result, err := capture.Stderr(func() { + lrlog.SetLogWriter() + lrlog.Info("This is an info log") + }) + assert.Nil(t, err) + assert.Contains(t, result, "[INFO]") + assert.Contains(t, result, "This is an info log") + }) + t.Run("test warning", func(t *testing.T) { + result, err := capture.Stderr(func() { + lrlog.SetLogWriter() + lrlog.Warning("This is a warning log") + }) + assert.Nil(t, err) + assert.Contains(t, result, "[WARNING]") + assert.Contains(t, result, "This is a warning log") + }) + t.Run("test error", func(t *testing.T) { + result, err := capture.Stderr(func() { + lrlog.SetLogWriter() + lrlog.Error("This is an error log") + }) + assert.Nil(t, err) + assert.Contains(t, result, "[ERROR]") + assert.Contains(t, result, "This is an error log") + }) + + t.Run("test infof", func(t *testing.T) { + result, err := capture.Stderr(func() { + lrlog.SetLogWriter() + lrlog.Infof("This is an info log") + }) + assert.Nil(t, err) + assert.Contains(t, result, "[INFO]") + assert.Contains(t, result, "This is an info log") + }) + t.Run("test warningf", func(t *testing.T) { + result, err := capture.Stderr(func() { + lrlog.SetLogWriter() + lrlog.Warningf("This is a warning log") + }) + assert.Nil(t, err) + assert.Contains(t, result, "[WARNING]") + assert.Contains(t, result, "This is a warning log") + }) + t.Run("test errorf", func(t *testing.T) { + result, err := capture.Stderr(func() { + lrlog.SetLogWriter() + lrlog.Errorf("This is an error log") + }) + assert.Nil(t, err) + assert.Contains(t, result, "[ERROR]") + assert.Contains(t, result, "This is an error log") + }) +} + +func TestVLogging(t *testing.T) { + t.Run("test v info", func(t *testing.T) { + result, err := capture.Stderr(func() { + lrlog.SetLogWriter() + lrlog.V(0).Info("This is an info log") + }) + assert.Nil(t, err) + assert.Contains(t, result, "[INFO]") + assert.Contains(t, result, "This is an info log") + }) + t.Run("test v warning", func(t *testing.T) { + result, err := capture.Stderr(func() { + lrlog.SetLogWriter() + lrlog.V(0).Warning("This is a warning log") + }) + assert.Nil(t, err) + assert.Contains(t, result, "[WARNING]") + assert.Contains(t, result, "This is a warning log") + }) + t.Run("test v error", func(t *testing.T) { + result, err := capture.Stderr(func() { + lrlog.SetLogWriter() + lrlog.V(0).Error("This is an error log") + }) + assert.Nil(t, err) + assert.Contains(t, result, "[ERROR]") + assert.Contains(t, result, "This is an error log") + }) +} + +func TestVerbose(t *testing.T) { + t.Run("verbosity level 0", func(t *testing.T) { + lrlog.SetVerbosity(0) + assert.True(t, bool(lrlog.V(0))) + assert.False(t, bool(lrlog.V(1))) + assert.False(t, bool(lrlog.V(2))) + assert.False(t, bool(lrlog.V(3))) + }) + t.Run("verbosity level 1", func(t *testing.T) { + lrlog.SetVerbosity(1) + assert.True(t, bool(lrlog.V(0))) + assert.True(t, bool(lrlog.V(1))) + assert.False(t, bool(lrlog.V(2))) + assert.False(t, bool(lrlog.V(3))) + }) + t.Run("verbosity level 2", func(t *testing.T) { + lrlog.SetVerbosity(2) + assert.True(t, bool(lrlog.V(0))) + assert.True(t, bool(lrlog.V(1))) + assert.True(t, bool(lrlog.V(2))) + assert.False(t, bool(lrlog.V(3))) + }) + t.Run("verbosity level 3", func(t *testing.T) { + lrlog.SetVerbosity(3) + assert.True(t, bool(lrlog.V(0))) + assert.True(t, bool(lrlog.V(1))) + assert.True(t, bool(lrlog.V(2))) + assert.True(t, bool(lrlog.V(3))) + }) +} diff --git a/pubsubbeat.yml b/pubsubbeat.yml index 14c1e085b..14dc1facd 100644 --- a/pubsubbeat.yml +++ b/pubsubbeat.yml @@ -24,6 +24,12 @@ pubsubbeat: # The Pub/Sub subscription name. subscription.name: my-subscription + #time interval to check heartbeat + heartbeatinterval : 5s + + #flag to check heartbeat status + heartbeatdisabled: false + # Attempt to create the subscription. subscription.create: true # Defaults to true From c7674da37ec29cad8087a9df438d10d229b08435 Mon Sep 17 00:00:00 2001 From: Rajan Joshi Date: Sat, 21 Mar 2020 22:45:51 +0530 Subject: [PATCH 16/17] extra packages removed --- heartbeat/status_beater.go | 4 +- lrlog/lrlog.go | 143 ------------------------------------- lrlog/tests/lrlog_test.go | 128 --------------------------------- 3 files changed, 1 insertion(+), 274 deletions(-) delete mode 100644 lrlog/lrlog.go delete mode 100644 lrlog/tests/lrlog_test.go diff --git a/heartbeat/status_beater.go b/heartbeat/status_beater.go index 99b0c0575..470029887 100644 --- a/heartbeat/status_beater.go +++ b/heartbeat/status_beater.go @@ -8,8 +8,6 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/golang/protobuf/ptypes/timestamp" - "github.com/logrhythm/sophoscentralbeat/lrlog" - "github.com/pkg/errors" ) // Heartbeat is a structure for heartbeat @@ -84,7 +82,7 @@ func (sb *StatusBeater) Beat(status int64, description string, publish func(even } msgJSON, err := json.Marshal(msg) if err != nil { - lrlog.Warning(errors.Wrap(err, "internal heartbeat message json conversion failed")) + logp.Warn("internal heartbeat message json conversion failed %s", err) return } sb.PublishEvent(msgJSON, publish) diff --git a/lrlog/lrlog.go b/lrlog/lrlog.go deleted file mode 100644 index 92087992a..000000000 --- a/lrlog/lrlog.go +++ /dev/null @@ -1,143 +0,0 @@ -package lrlog - -import ( - "fmt" - "log" - "os" - "time" -) - -const ( - infoTagPrefix = "[INFO] " - warningTagPrefix = "[WARNING] " - errorTagPrefix = "[ERROR] " - fatalTagPrefix = "[FATAL] " -) - -var verbosity int - -func init() { - SetLogWriter() -} - -type logWriter struct{} - -func (writer logWriter) Write(bytes []byte) (int, error) { - return fmt.Fprintf(os.Stderr, "%s %s\n", time.Now().UTC().Format("2006-01-02T15:04:05.999Z"), string(bytes)) -} - -// SetLogWriter sets the logging output format -func SetLogWriter() { - log.SetOutput(new(logWriter)) -} - -// SetVerbosity sets the global verbosity level -func SetVerbosity(level int) { - verbosity = level -} - -// Verbose is a boolean alias that allows for convenient inline logging -type Verbose bool - -// V returns true if the logging is set to `level` or higher, false otherwise -func V(level int) Verbose { - return Verbose(verbosity >= level) -} - -// Info is just a call to Info wrapped in a boolean -func (v Verbose) Info(msg string) { - if v { - Info(msg) - } -} - -// Infof is just a call to Infof wrapped in a boolean -func (v Verbose) Infof(msg string, objs ...interface{}) { - if v { - Infof(msg, objs...) - } -} - -// Warning is just a call to Warning wrapped in a boolean -func (v Verbose) Warning(msg string) { - if v { - Warning(msg) - } -} - -// Warningf is just a call to Warningf wrapped in a boolean -func (v Verbose) Warningf(msg string, objs ...interface{}) { - if v { - Warningf(msg, objs...) - } -} - -// Error is just a call to Error wrapped in a boolean -func (v Verbose) Error(msg string) { - if v { - Error(msg) - } -} - -// Errorf is just a call to Errorf wrapped in a boolean -func (v Verbose) Errorf(msg string, objs ...interface{}) { - if v { - Errorf(msg, objs...) - } -} - -// Fatal is just a call to Fatal wrapped in a boolean -func (v Verbose) Fatal(msg string) { - if v { - Fatal(msg) - } -} - -// Fatalf is just a call to Fatalf wrapped in a boolean -func (v Verbose) Fatalf(msg string, objs ...interface{}) { - if v { - Fatalf(msg, objs...) - } -} - -// Info logs args at info level -func Info(args ...interface{}) { - log.Print(infoTagPrefix, args) -} - -// Infof logs a formatted info message with args -func Infof(format string, args ...interface{}) { - log.Printf(infoTagPrefix+format, args...) -} - -// Warning logs args at warning level -func Warning(args ...interface{}) { - log.Print(warningTagPrefix, args) -} - -// Warningf logs a formatted warning message with args -func Warningf(format string, args ...interface{}) { - log.Printf(warningTagPrefix+format, args...) -} - -// Error logs args at error level -func Error(args ...interface{}) { - log.Print(errorTagPrefix, args) -} - -// Errorf logs a formatted error message with args -func Errorf(format string, args ...interface{}) { - log.Printf(errorTagPrefix+format, args...) -} - -// Fatal logs args at fatal level -func Fatal(objs ...interface{}) { - log.Print(fatalTagPrefix, objs) - os.Exit(1) -} - -// Fatalf logs a formatted fatal message with args -func Fatalf(format string, args ...interface{}) { - log.Printf(fatalTagPrefix+format, args...) - os.Exit(1) -} diff --git a/lrlog/tests/lrlog_test.go b/lrlog/tests/lrlog_test.go deleted file mode 100644 index 5299cf9ed..000000000 --- a/lrlog/tests/lrlog_test.go +++ /dev/null @@ -1,128 +0,0 @@ -package tests - -import ( - "testing" - - "github.com/logrhythm/siem/internal/pkg/lrlog" - "github.com/logrhythm/siem/internal/pkg/testing/capture" - "github.com/stretchr/testify/assert" -) - -func TestLogLevels(t *testing.T) { - t.Run("test info", func(t *testing.T) { - result, err := capture.Stderr(func() { - lrlog.SetLogWriter() - lrlog.Info("This is an info log") - }) - assert.Nil(t, err) - assert.Contains(t, result, "[INFO]") - assert.Contains(t, result, "This is an info log") - }) - t.Run("test warning", func(t *testing.T) { - result, err := capture.Stderr(func() { - lrlog.SetLogWriter() - lrlog.Warning("This is a warning log") - }) - assert.Nil(t, err) - assert.Contains(t, result, "[WARNING]") - assert.Contains(t, result, "This is a warning log") - }) - t.Run("test error", func(t *testing.T) { - result, err := capture.Stderr(func() { - lrlog.SetLogWriter() - lrlog.Error("This is an error log") - }) - assert.Nil(t, err) - assert.Contains(t, result, "[ERROR]") - assert.Contains(t, result, "This is an error log") - }) - - t.Run("test infof", func(t *testing.T) { - result, err := capture.Stderr(func() { - lrlog.SetLogWriter() - lrlog.Infof("This is an info log") - }) - assert.Nil(t, err) - assert.Contains(t, result, "[INFO]") - assert.Contains(t, result, "This is an info log") - }) - t.Run("test warningf", func(t *testing.T) { - result, err := capture.Stderr(func() { - lrlog.SetLogWriter() - lrlog.Warningf("This is a warning log") - }) - assert.Nil(t, err) - assert.Contains(t, result, "[WARNING]") - assert.Contains(t, result, "This is a warning log") - }) - t.Run("test errorf", func(t *testing.T) { - result, err := capture.Stderr(func() { - lrlog.SetLogWriter() - lrlog.Errorf("This is an error log") - }) - assert.Nil(t, err) - assert.Contains(t, result, "[ERROR]") - assert.Contains(t, result, "This is an error log") - }) -} - -func TestVLogging(t *testing.T) { - t.Run("test v info", func(t *testing.T) { - result, err := capture.Stderr(func() { - lrlog.SetLogWriter() - lrlog.V(0).Info("This is an info log") - }) - assert.Nil(t, err) - assert.Contains(t, result, "[INFO]") - assert.Contains(t, result, "This is an info log") - }) - t.Run("test v warning", func(t *testing.T) { - result, err := capture.Stderr(func() { - lrlog.SetLogWriter() - lrlog.V(0).Warning("This is a warning log") - }) - assert.Nil(t, err) - assert.Contains(t, result, "[WARNING]") - assert.Contains(t, result, "This is a warning log") - }) - t.Run("test v error", func(t *testing.T) { - result, err := capture.Stderr(func() { - lrlog.SetLogWriter() - lrlog.V(0).Error("This is an error log") - }) - assert.Nil(t, err) - assert.Contains(t, result, "[ERROR]") - assert.Contains(t, result, "This is an error log") - }) -} - -func TestVerbose(t *testing.T) { - t.Run("verbosity level 0", func(t *testing.T) { - lrlog.SetVerbosity(0) - assert.True(t, bool(lrlog.V(0))) - assert.False(t, bool(lrlog.V(1))) - assert.False(t, bool(lrlog.V(2))) - assert.False(t, bool(lrlog.V(3))) - }) - t.Run("verbosity level 1", func(t *testing.T) { - lrlog.SetVerbosity(1) - assert.True(t, bool(lrlog.V(0))) - assert.True(t, bool(lrlog.V(1))) - assert.False(t, bool(lrlog.V(2))) - assert.False(t, bool(lrlog.V(3))) - }) - t.Run("verbosity level 2", func(t *testing.T) { - lrlog.SetVerbosity(2) - assert.True(t, bool(lrlog.V(0))) - assert.True(t, bool(lrlog.V(1))) - assert.True(t, bool(lrlog.V(2))) - assert.False(t, bool(lrlog.V(3))) - }) - t.Run("verbosity level 3", func(t *testing.T) { - lrlog.SetVerbosity(3) - assert.True(t, bool(lrlog.V(0))) - assert.True(t, bool(lrlog.V(1))) - assert.True(t, bool(lrlog.V(2))) - assert.True(t, bool(lrlog.V(3))) - }) -} From 4d03cec7bf279dfcdc337c6ab8de965660e71e05 Mon Sep 17 00:00:00 2001 From: Rajan Joshi Date: Sat, 21 Mar 2020 23:03:06 +0530 Subject: [PATCH 17/17] log message changed --- heartbeat/enable.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/heartbeat/enable.go b/heartbeat/enable.go index 3da0d4a82..bfb077562 100644 --- a/heartbeat/enable.go +++ b/heartbeat/enable.go @@ -37,7 +37,7 @@ func (config *Config) CreateEnabled(doneChan chan struct{}, serviceName string) if config.Interval <= 0 { // Shouldn't happen in regular code path because of our defaults / validation - logp.Warn("invalid heartbeat duration specified. duration must be greater than zero") + logp.Warn("Heartbeat interval can not be less than zero. Setting to default 5 minute") config.Interval = IntervalValue }