Skip to content

Commit

Permalink
converterChan
Browse files Browse the repository at this point in the history
  • Loading branch information
michalpristas committed Aug 15, 2024
1 parent 0ab2069 commit 1094d8f
Showing 1 changed file with 10 additions and 10 deletions.
20 changes: 10 additions & 10 deletions pkg/stanza/adapter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ type Converter struct {

stopOnce sync.Once

// stopChan is an internal communication channel signalling stop was called
// converterChan is an internal communication channel signalling stop was called
// prevents sending to closed channels
stopChan chan struct{}
converterChan chan struct{}

// workerChan is an internal communication channel that gets the log
// entries from Batch() calls and it receives the data in workerLoop().
Expand Down Expand Up @@ -101,12 +101,12 @@ func (o workerCountOption) apply(c *Converter) {
func NewConverter(set component.TelemetrySettings, opts ...converterOption) *Converter {
set.Logger = set.Logger.With(zap.String("component", "converter"))
c := &Converter{
set: set,
workerChan: make(chan []*entry.Entry),
workerCount: int(math.Max(1, float64(runtime.NumCPU()/4))),
pLogsChan: make(chan plog.Logs),
stopChan: make(chan struct{}),
flushChan: make(chan plog.Logs),
set: set,
workerChan: make(chan []*entry.Entry),
workerCount: int(math.Max(1, float64(runtime.NumCPU()/4))),
pLogsChan: make(chan plog.Logs),
converterChan: make(chan struct{}),
flushChan: make(chan plog.Logs),
}
for _, opt := range opts {
opt.apply(c)
Expand All @@ -128,7 +128,7 @@ func (c *Converter) Start() {

func (c *Converter) Stop() {
c.stopOnce.Do(func() {
close(c.stopChan)
close(c.converterChan)

// close workerChan and wait for entries to be processed
close(c.workerChan)
Expand Down Expand Up @@ -227,7 +227,7 @@ func (c *Converter) flush(ctx context.Context, pLogs plog.Logs) error {
func (c *Converter) Batch(e []*entry.Entry) error {
// in case Stop was called do not process batch
select {
case <-c.stopChan:
case <-c.converterChan:
return errors.New("logs converter has been stopped")
default:
}
Expand Down

0 comments on commit 1094d8f

Please sign in to comment.