Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
abhishek.upadhyay authored and abhishek.upadhyay committed May 28, 2020
2 parents 1f979bb + bab4564 commit 021b62c
Show file tree
Hide file tree
Showing 10 changed files with 446 additions and 9 deletions.
104 changes: 95 additions & 9 deletions beater/pubsubbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,28 @@
package beater

import (
"context"
"errors"
"fmt"
"io/ioutil"
"os"
"path"
"sync"
"time"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"

"context"

"runtime"

"encoding/json"

"github.com/logrhythm/pubsubbeat/crypto"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"

"cloud.google.com/go/pubsub"
"github.com/logrhythm/pubsubbeat/config"
"github.com/logrhythm/pubsubbeat/heartbeat"
"google.golang.org/api/option"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -42,8 +49,22 @@ type Pubsubbeat struct {
pubsubClient *pubsub.Client
subscription *pubsub.Subscription
logger *logp.Logger
StopChan chan struct{}
}

const (
cycleTime = 10 //will be in seconds
ServiceName = "pubsubbeat"
)

var (
receivedLogsInCycle int64
counterLock sync.RWMutex
logsReceived int64
)

var stopCh = make(chan struct{})

func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
config, err := config.GetAndValidateConfig(cfg)
if err != nil {
Expand Down Expand Up @@ -74,6 +95,7 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
}

func (bt *Pubsubbeat) Run(b *beat.Beat) error {

bt.logger.Info("pubsubbeat is running! Hit CTRL-C to stop it.")

var err error
Expand All @@ -92,8 +114,22 @@ func (bt *Pubsubbeat) Run(b *beat.Beat) error {
bt.pubsubClient.Close()
}()

// Self-reporting heartbeat
bt.StopChan = make(chan struct{})
hb := heartbeat.NewHeartbeatConfig(bt.config.HeartbeatInterval, bt.config.HeartbeatDisabled)
heartbeater, err := hb.CreateEnabled(bt.StopChan, ServiceName)
if err != nil {
logp.Info("Error while creating new heartbeat object: %v", err)
}
if heartbeater != nil {
heartbeater.Start(bt.StopChan, bt.client.Publish)
}

go cycleRoutine(time.Duration(cycleTime))

err = bt.subscription.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
// This callback is invoked concurrently by multiple goroutines

var datetime time.Time
eventMap := common.MapStr{
"type": b.Info.Name,
Expand Down Expand Up @@ -128,7 +164,7 @@ func (bt *Pubsubbeat) Run(b *beat.Beat) error {
}

if unmarshalErr != nil {
bt.logger.Warnf("failed to decode json message: %s", unmarshalErr)
bt.logger.Errorf("failed to decode json message: %s", unmarshalErr)
if bt.config.Json.AddErrorKey {
eventMap["error"] = common.MapStr{
"key": "json",
Expand All @@ -141,11 +177,16 @@ func (bt *Pubsubbeat) Run(b *beat.Beat) error {
if datetime.IsZero() {
datetime = time.Now()
}

bt.client.Publish(beat.Event{
Timestamp: datetime,
Fields: eventMap,
})

counterLock.Lock()
receivedLogsInCycle = receivedLogsInCycle + 1
counterLock.Unlock()

// TODO: Evaluate using AckHandler.
m.Ack()
})
Expand All @@ -159,16 +200,35 @@ func (bt *Pubsubbeat) Run(b *beat.Beat) error {

func (bt *Pubsubbeat) Stop() {
bt.client.Close()
close(stopCh)
bt.StopChan <- struct{}{}
close(bt.done)
}

func createPubsubClient(config *config.Config) (*pubsub.Client, error) {
ctx := context.Background()
userAgent := fmt.Sprintf(
"Elastic/Pubsubbeat (%s; %s)", runtime.GOOS, runtime.GOARCH)
tempFile, err := ioutil.TempFile(path.Dir(config.CredentialsFile), "temp")
if err != nil {
return nil, fmt.Errorf("fail to create temp file for decrypted credentials: %v", err)
}
defer os.Remove(tempFile.Name())

options := []option.ClientOption{option.WithUserAgent(userAgent)}
if config.CredentialsFile != "" {
options = append(options, option.WithCredentialsFile(config.CredentialsFile))
c, err := ioutil.ReadFile(config.CredentialsFile) // just pass the file name
if err != nil {
return nil, fmt.Errorf("fail to encrypted credentials: %v", err)
}

decryptedContent, err := crypto.Decrypt(string(c))
if err != nil {
return nil, errors.New("error decrypting Content")
}
tempFile.WriteString(decryptedContent)
options = append(options, option.WithCredentialsFile(tempFile.Name()))

}

client, err := pubsub.NewClient(ctx, config.Project, options...)
Expand All @@ -193,9 +253,12 @@ func getOrCreateSubscription(client *pubsub.Client, config *config.Config) (*pub
RetentionDuration: config.Subscription.RetentionDuration,
})

if st, ok := status.FromError(err); ok && st.Code() == codes.AlreadyExists {
st, ok := status.FromError(err)
if ok && st.Code() == codes.AlreadyExists {
// The subscription already exists.
subscription = client.Subscription(config.Subscription.Name)
} else if err != nil {
return nil, fmt.Errorf(st.Message())
} else if ok && st.Code() == codes.NotFound {
return nil, fmt.Errorf("topic %q does not exists", config.Topic)
} else if !ok {
Expand All @@ -204,3 +267,26 @@ func getOrCreateSubscription(client *pubsub.Client, config *config.Config) (*pub

return subscription, nil
}

func cycleRoutine(n time.Duration) {
for {
select {
case <-stopCh:
break
default:
}

time.Sleep(n * time.Second)
counterLock.Lock()
logsReceived = logsReceived + receivedLogsInCycle
var recordsPerSecond int64
if receivedLogsInCycle > 0 {
recordsPerSecond = receivedLogsInCycle / int64(cycleTime)
}
logp.Info("Total number of logs received in current cycle : %d", receivedLogsInCycle)
receivedLogsInCycle = 0
counterLock.Unlock()
logp.Info("Total number of logs received : %d", logsReceived)
logp.Info("Events Flush Rate: %v messages per second", recordsPerSecond)
}
}
5 changes: 5 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"os"

"github.com/elastic/beats/libbeat/common"
hb "github.com/logrhythm/pubsubbeat/heartbeat"
)

type Config struct {
Expand All @@ -42,12 +43,16 @@ type Config struct {
FieldsTimestampName string `config:"fields_timestamp_name"`
FieldsTimestampFormat string `config:"fields_timestamp_format"`
}
HeartbeatInterval time.Duration `config:"heartbeatinterval"`
HeartbeatDisabled bool `config:"heartbeatdisabled"`
}

func GetDefaultConfig() Config {
config := Config{}
config.Subscription.Create = true
config.Json.FieldsTimestampName = "@timestamp"
config.HeartbeatInterval = hb.IntervalValue
config.HeartbeatDisabled = false
return config
}

Expand Down
21 changes: 21 additions & 0 deletions crypto/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "go_default_library",
srcs = [
"decryption.go",
"encryption.go",
],
importpath = "github.com/logrhythm/siem/internal/pkg/crypto",
visibility = ["//:__subpackages__"],
)

go_test(
name = "go_default_test",
srcs = [
"decryption_test.go",
"encryption_test.go",
],
embed = [":go_default_library"],
deps = ["@com_github_stretchr_testify//assert:go_default_library"],
)
64 changes: 64 additions & 0 deletions crypto/decryption.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package crypto

import (
"crypto/aes"
"crypto/cipher"
"encoding/base64"
"errors"
"strconv"
"strings"
)

// Decrypt function is used to decrypt the string
func Decrypt(securemess string) (decodedmess string, err error) {
if len(strings.TrimSpace(securemess)) == 0 {
return "", errors.New("string is empty")
}
decodedStr := strings.Split(securemess, "||")
if len(decodedStr) == 2 {
ver, err := strconv.Atoi(decodedStr[0])
if err != nil {
return "", err
}
switch ver {
case encV1:
decodedmess, err = decrypt1(decodedStr[1])
if err != nil {
return "", err
}
default:
return "", errors.New("invalid encryption")
}
}

return decodedmess, nil
}

func decrypt1(securemess string) (string, error) {
cipherText, err := base64.URLEncoding.DecodeString(securemess)
if err != nil {
return "", err
}

block, err := aes.NewCipher(cipherKey)
if err != nil {
return "", err
}

if len(cipherText) < aes.BlockSize {
err = errors.New("ciphertext block size is too short")
return "", err
}

//IV needs to be unique, but doesn't have to be secure.
//It's common to put it at the beginning of the ciphertext.
iv := cipherText[:aes.BlockSize]
cipherText = cipherText[aes.BlockSize:]

stream := cipher.NewCFBDecrypter(block, iv)
// XORKeyStream can work in-place if the two arguments are the same.
stream.XORKeyStream(cipherText, cipherText)

decodedmess := string(cipherText)
return decodedmess, nil
}
25 changes: 25 additions & 0 deletions crypto/decryption_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package crypto

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
)

const TestString = "encryptme"

func TestDecrypt(t *testing.T) {
t.Run("success decryption", func(t *testing.T) {
enryptedMess, err := Encrypt(TestString)
assert.Nil(t, err)
actual, err := Decrypt(enryptedMess)
assert.Nil(t, err)
assert.Equal(t, TestString, actual)
})
t.Run("failure decryption", func(t *testing.T) {
str := fmt.Sprintf("%d%s%s", encV1, "||", TestString)
_, err := Decrypt(str)
assert.NotNil(t, err)
})
}
47 changes: 47 additions & 0 deletions crypto/encryption.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package crypto

import (
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"encoding/base64"
"errors"
"io"
"strconv"
"strings"
)

var cipherKey = []byte("0123456789012345")

const (
encV1 = 1
)

//Encrypt function is used to encrypt the string
func Encrypt(message string) (encmess string, err error) {
if len(strings.TrimSpace(message)) == 0 {
return "", errors.New("string is empty")
}
plainText := []byte(message)

block, err := aes.NewCipher(cipherKey)
if err != nil {
return "", err
}

//IV needs to be unique, but doesn't have to be secure.
//It's common to put it at the beginning of the ciphertext.
cipherText := make([]byte, aes.BlockSize+len(plainText))
iv := cipherText[:aes.BlockSize]
if _, err = io.ReadFull(rand.Reader, iv); err != nil {
return "", err
}

stream := cipher.NewCFBEncrypter(block, iv)
stream.XORKeyStream(cipherText[aes.BlockSize:], plainText)

//returns to base64 encoded string
encmess = base64.URLEncoding.EncodeToString(cipherText)
finalEnc := strconv.Itoa(encV1) + "||" + encmess //fmt.Sprintf("%d%s%s", encV1, "||", encmess)
return finalEnc, nil
}
16 changes: 16 additions & 0 deletions crypto/encryption_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package crypto

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestEncrypt(t *testing.T) {
t.Run("success encryption", func(t *testing.T) {
enryptedMess, err := Encrypt(TestString)
assert.Nil(t, err)
_, err = Decrypt(enryptedMess)
assert.Nil(t, err)
})
}
Loading

0 comments on commit 021b62c

Please sign in to comment.