Skip to content

Commit

Permalink
feat: refactor it to use init config instead environment variables, a…
Browse files Browse the repository at this point in the history
…dd plugin max event size configuration

Signed-off-by: Igor Eulalio <[email protected]>
  • Loading branch information
IgorEulalio authored and poiana committed Jan 7, 2025
1 parent fd257f2 commit 7da2323
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 38 deletions.
48 changes: 11 additions & 37 deletions plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"log"
"os"
"regexp"
"strconv"
"sync"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
Expand Down Expand Up @@ -52,6 +51,7 @@ type PluginConfig struct {
BlobStorageContainerName string `json:"blob_storage_container_name" jsonschema:"title=blob_storage_container_name,description=The name of the Blob Storage container to use as checkpoint store"`
RateLimitEventsPerSecond int `json:"rate_limit_events_per_second" jsonschema:"title=rate_limit_events_per_second,description=The rate limit of events per second to read from EventHub"`
RateLimitBurst int `json:"rate_limit_burst" jsonschema:"title=rate_limit_burst,description=The rate limit burst of events to read from EventHub"`
MaxEventSize uint64 `json:"maxEventSize" jsonschema:"title=Maximum event size,description=Maximum size of single audit event (Default: 262144),default=262144"`
}

func (p *Plugin) Info() *plugins.Info {
Expand All @@ -65,45 +65,20 @@ func (p *Plugin) Info() *plugins.Info {
}
}

// Reset sets the configuration to its default values
func (p *PluginConfig) Reset() {
if i := os.Getenv("EVENTHUB_NAMESPACE_CONNECTION_STRING"); i != "" {
p.EventHubNamespaceConnectionString = i
}
if i := os.Getenv("EVENTHUB_NAME"); i != "" {
p.EventHubName = i
}
if i := os.Getenv("BLOB_STORAGE_CONNECTION_STRING"); i != "" {
p.BlobStorageConnectionString = i
}
if i := os.Getenv("BLOB_STORAGE_CONTAINER_NAME"); i != "" {
p.BlobStorageContainerName = i
}
if i := os.Getenv("RATE_LIMIT_EVENTS_PER_SECOND"); i != "" {
rateLimitEventsPerSecond, err := strconv.Atoi(i)
if err != nil {
return
}
p.RateLimitEventsPerSecond = rateLimitEventsPerSecond
} else {
p.RateLimitEventsPerSecond = 100
}
if i := os.Getenv("RATE_LIMIT_BURST"); i != "" {
rateLimitBurst, err := strconv.Atoi(i)
if err != nil {
return
}
p.RateLimitBurst = rateLimitBurst
} else {
p.RateLimitBurst = 200
}
func (p *PluginConfig) SetDefault() {
p.RateLimitBurst = 200
p.RateLimitEventsPerSecond = 100
}

// Resets sets the configuration to its default values
func (k *PluginConfig) Reset() {
k.MaxEventSize = uint64(sdk.DefaultEvtSize)
}

func (p *Plugin) Init(cfg string) error {
// read configuration
p.Plugin.Config.Reset()
p.Config.Reset()

p.Plugin.Config.Reset()
p.Config.SetDefault()
err := json.Unmarshal([]byte(cfg), &p.Config)
if err != nil {
return err
Expand Down Expand Up @@ -142,7 +117,6 @@ func (p *Plugin) OpenParams() ([]sdk.OpenParam, error) {

func (p *Plugin) Open(_ string) (source.Instance, error) {
ctx, cancel := context.WithCancel(context.Background())

checkClient, err := container.NewClientFromConnectionString(p.Config.BlobStorageConnectionString, p.Config.BlobStorageContainerName, nil)
if err != nil {
p.Logger.Printf("error opening connection to blob storage: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion shared/go/azure/eventhub/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (p *Processor) Process(
defer closePartitionResources(partitionClient)

for {
receiveCtx, receiveCtxCancel := context.WithTimeout(ctx, time.Minute)
receiveCtx, receiveCtxCancel := context.WithTimeout(ctx, time.Second*10)
events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
receiveCtxCancel()
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
Expand Down

0 comments on commit 7da2323

Please sign in to comment.