Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
schmikei committed Jan 9, 2025
1 parent 3b6cebe commit 7cce249
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 107 deletions.
52 changes: 52 additions & 0 deletions receiver/azureblobrehydrationreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ func TestConfigValidate(t *testing.T) {
DeleteOnRead: false,
PollInterval: time.Second,
PollTimeout: time.Second * 10,
BatchSize: 100,
PageSize: 1000,
},
expectErr: errors.New("connection_string is required"),
},
Expand All @@ -53,6 +55,8 @@ func TestConfigValidate(t *testing.T) {
DeleteOnRead: false,
PollInterval: time.Second,
PollTimeout: time.Second * 10,
BatchSize: 100,
PageSize: 1000,
},
expectErr: errors.New("container is required"),
},
Expand All @@ -67,6 +71,8 @@ func TestConfigValidate(t *testing.T) {
DeleteOnRead: false,
PollInterval: time.Second,
PollTimeout: time.Second * 10,
BatchSize: 100,
PageSize: 1000,
},
expectErr: errors.New("starting_time is invalid: missing value"),
},
Expand All @@ -81,6 +87,8 @@ func TestConfigValidate(t *testing.T) {
DeleteOnRead: false,
PollInterval: time.Second,
PollTimeout: time.Second * 10,
BatchSize: 100,
PageSize: 1000,
},
expectErr: errors.New("ending_time is invalid: missing value"),
},
Expand All @@ -95,6 +103,8 @@ func TestConfigValidate(t *testing.T) {
DeleteOnRead: false,
PollInterval: time.Second,
PollTimeout: time.Second * 10,
BatchSize: 100,
PageSize: 1000,
},
expectErr: errors.New("starting_time is invalid: invalid timestamp"),
},
Expand All @@ -109,6 +119,8 @@ func TestConfigValidate(t *testing.T) {
DeleteOnRead: false,
PollInterval: time.Second,
PollTimeout: time.Second * 10,
BatchSize: 100,
PageSize: 1000,
},
expectErr: errors.New("ending_time is invalid: invalid timestamp"),
},
Expand All @@ -123,6 +135,8 @@ func TestConfigValidate(t *testing.T) {
DeleteOnRead: false,
PollInterval: time.Second,
PollTimeout: time.Second * 10,
BatchSize: 100,
PageSize: 1000,
},
expectErr: errors.New("ending_time must be at least one minute after starting_time"),
},
Expand All @@ -137,6 +151,8 @@ func TestConfigValidate(t *testing.T) {
DeleteOnRead: false,
PollInterval: time.Millisecond,
PollTimeout: time.Second * 10,
BatchSize: 100,
PageSize: 1000,
},
expectErr: errors.New("poll_interval must be at least one second"),
},
Expand All @@ -151,9 +167,43 @@ func TestConfigValidate(t *testing.T) {
DeleteOnRead: false,
PollInterval: time.Second * 2,
PollTimeout: time.Millisecond,
BatchSize: 100,
PageSize: 1000,
},
expectErr: errors.New("poll_timeout must be at least one second"),
},
{
desc: "Bad batch_size",
cfg: &Config{
ConnectionString: "connection_string",
Container: "container",
RootFolder: "root",
StartingTime: "2023-10-02T17:00",
EndingTime: "2023-10-02T17:01",
DeleteOnRead: false,
PollInterval: time.Second,
PollTimeout: time.Second * 10,
BatchSize: 0,
PageSize: 1000,
},
expectErr: errors.New("batch_size must be greater than 0"),
},
{
desc: "Bad page_size",
cfg: &Config{
ConnectionString: "connection_string",
Container: "container",
RootFolder: "root",
StartingTime: "2023-10-02T17:00",
EndingTime: "2023-10-02T17:01",
DeleteOnRead: false,
PollInterval: time.Second,
PollTimeout: time.Second * 10,
BatchSize: 100,
PageSize: 0,
},
expectErr: errors.New("page_size must be greater than 0"),
},
{
desc: "Valid config",
cfg: &Config{
Expand All @@ -165,6 +215,8 @@ func TestConfigValidate(t *testing.T) {
DeleteOnRead: false,
PollInterval: time.Second,
PollTimeout: time.Second * 10,
BatchSize: 100,
PageSize: 1000,
},
expectErr: nil,
},
Expand Down
2 changes: 2 additions & 0 deletions receiver/azureblobrehydrationreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ func Test_createDefaultConfig(t *testing.T) {
DeleteOnRead: false,
PollInterval: time.Minute,
PollTimeout: time.Second * 30,
BatchSize: 100,
PageSize: 1000,
}

componentCfg := createDefaultConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type BlobClient interface {

// StreamBlobs will stream blobs to the blobChan and errors to the errChan, generally if an errChan gets an item
// then the stream should be stopped
StreamBlobs(ctx context.Context, container string, prefix, marker *string, errChan chan error, blobChan chan *BlobResults, doneChan chan struct{})
StreamBlobs(ctx context.Context, container string, prefix *string, errChan chan error, blobChan chan *BlobResults, doneChan chan struct{})
}

// AzureClient is an implementation of the BlobClient for Azure
Expand Down Expand Up @@ -118,9 +118,10 @@ type BlobResults struct {

// StreamBlobs will stream blobs to the blobChan and errors to the errChan, generally if an errChan gets an item
// then the stream should be stopped
func (a *AzureClient) StreamBlobs(ctx context.Context, container string, prefix, beginningMarker *string, errChan chan error, blobChan chan *BlobResults, doneChan chan struct{}) {
func (a *AzureClient) StreamBlobs(ctx context.Context, container string, prefix *string, errChan chan error, blobChan chan *BlobResults, doneChan chan struct{}) {
var marker *string
pager := a.azClient.NewListBlobsFlatPager(container, &azblob.ListBlobsFlatOptions{
Marker: beginningMarker,
Marker: marker,
Prefix: prefix,
MaxResults: &a.pageSize,
})
Expand Down Expand Up @@ -161,7 +162,7 @@ func (a *AzureClient) StreamBlobs(ctx context.Context, container string, prefix,
if len(batch) == int(a.batchSize) {
blobChan <- &BlobResults{
Blobs: batch,
LastMarker: resp.NextMarker,
LastMarker: marker,
}
batch = []*BlobInfo{}
}
Expand All @@ -175,8 +176,9 @@ func (a *AzureClient) StreamBlobs(ctx context.Context, container string, prefix,
emptyPollCount = 0
blobChan <- &BlobResults{
Blobs: batch,
LastMarker: resp.NextMarker,
LastMarker: marker,
}
marker = resp.NextMarker
}
}

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 26 additions & 25 deletions receiver/azureblobrehydrationreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"path/filepath"
"sync"
"time"

"github.com/observiq/bindplane-otel-collector/internal/rehydration"
Expand All @@ -43,13 +44,18 @@ type rehydrationReceiver struct {
checkpoint *rehydration.CheckPoint
checkpointStore rehydration.CheckpointStorer

blobChan chan *azureblob.BlobResults
errChan chan error
doneChan chan struct{}

mut *sync.Mutex

lastBlob *azureblob.BlobInfo
lastBlobTime *time.Time

startingTime time.Time
endingTime time.Time

started bool
cancelFunc context.CancelFunc
}

Expand Down Expand Up @@ -119,6 +125,10 @@ func newRehydrationReceiver(id component.ID, logger *zap.Logger, cfg *Config) (*
checkpointStore: rehydration.NewNopStorage(),
startingTime: startingTime,
endingTime: endingTime,
blobChan: make(chan *azureblob.BlobResults),
errChan: make(chan error),
doneChan: make(chan struct{}),
mut: &sync.Mutex{},
}, nil
}

Expand All @@ -131,8 +141,6 @@ func (r *rehydrationReceiver) Start(ctx context.Context, host component.Host) er
}
r.checkpointStore = checkpointStore
}

r.started = true
go r.streamRehydrateBlobs(ctx)
return nil
}
Expand All @@ -144,21 +152,15 @@ func (r *rehydrationReceiver) Shutdown(ctx context.Context) error {
if r.cancelFunc != nil {
r.cancelFunc()
}
r.makeCheckpoint(ctx)
if err := r.makeCheckpoint(ctx); err != nil {
r.logger.Error("Error while saving checkpoint", zap.Error(err))
err = errors.Join(err, err)
}
err = errors.Join(err, r.checkpointStore.Close(ctx))
return err
}

// emptyPollLimit is the number of consecutive empty polling cycles that can
// occur before we stop polling.
const emptyPollLimit = 3

func (r *rehydrationReceiver) streamRehydrateBlobs(ctx context.Context) {
ticker := time.NewTicker(r.cfg.PollInterval)
defer ticker.Stop()

var marker *string

checkpoint, err := r.checkpointStore.LoadCheckPoint(ctx, r.checkpointKey())
if err != nil {
r.logger.Warn("Error loading checkpoint, continuing without a previous checkpoint", zap.Error(err))
Expand All @@ -171,30 +173,28 @@ func (r *rehydrationReceiver) streamRehydrateBlobs(ctx context.Context) {
prefix = &r.cfg.RootFolder
}

blobChan := make(chan *azureblob.BlobResults)
errChan := make(chan error)

cancelCtx, cancel := context.WithCancel(ctx)
r.cancelFunc = cancel
doneChan := make(chan struct{})

go r.azureClient.StreamBlobs(cancelCtx, r.cfg.Container, prefix, marker, errChan, blobChan, doneChan)
startTime := time.Now()
r.logger.Info("Starting rehydration", zap.Time("startTime", startTime))

go r.azureClient.StreamBlobs(cancelCtx, r.cfg.Container, prefix, r.errChan, r.blobChan, r.doneChan)

for {
select {
case <-ctx.Done():
return
case <-doneChan:
case <-r.doneChan:
r.logger.Info("Finished rehydrating blobs", zap.Int("durationSeconds", int(time.Since(startTime).Seconds())))
return
case err := <-errChan:
r.logger.Error("Error streaming blobs", zap.Error(err))
case err := <-r.errChan:
r.logger.Error("Error streaming blobs, stopping rehydration", zap.Error(err), zap.Int("durationSeconds", int(time.Since(startTime).Seconds())))
return
case br := <-blobChan:
r.logger.Debug("Received blobs from stream", zap.Int("number_of_blobs", len(br.Blobs)))
case br := <-r.blobChan:
r.rehydrateBlobs(ctx, br.Blobs)
}
}

}

func (r *rehydrationReceiver) rehydrateBlobs(ctx context.Context, blobs []*azureblob.BlobInfo) {
Expand Down Expand Up @@ -285,8 +285,9 @@ func (r *rehydrationReceiver) makeCheckpoint(ctx context.Context) error {
if r.lastBlob == nil || r.lastBlobTime == nil {
return nil
}

r.logger.Debug("Making checkpoint", zap.String("blob", r.lastBlob.Name), zap.Time("time", *r.lastBlobTime))
r.mut.Lock()
defer r.mut.Unlock()
r.checkpoint.UpdateCheckpoint(*r.lastBlobTime, r.lastBlob.Name)
r.checkpointStore.SaveCheckpoint(ctx, r.checkpointKey(), r.checkpoint)
return nil
Expand Down
Loading

0 comments on commit 7cce249

Please sign in to comment.