Skip to content

Commit

Permalink
Fix invalid write index updates in the persistent queue
Browse files Browse the repository at this point in the history
  • Loading branch information
swiatekm committed Nov 21, 2023
1 parent 109048d commit 725821b
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 5 deletions.
25 changes: 25 additions & 0 deletions .chloggen/fix_persistentstorage_index-updates.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix invalid write index updates in the persistent queue

# One or more tracking issues or pull requests related to the change
issues: [8115]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
15 changes: 10 additions & 5 deletions exporter/exporterhelper/internal/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,20 +205,25 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
}

itemKey := getItemKey(pq.writeIndex)
pq.writeIndex++
newIndex := pq.writeIndex + 1

reqBuf, err := pq.marshaler(req)
if err != nil {
return err
}
err = pq.client.Batch(ctx,
storage.SetOperation(writeIndexKey, itemIndexToBytes(pq.writeIndex)),
storage.SetOperation(itemKey, reqBuf))

// Carry out a transaction where we both add the item and update the write index
setWriteIndexOp := storage.SetOperation(writeIndexKey, itemIndexToBytes(newIndex))
setItemOp := storage.SetOperation(itemKey, reqBuf)
if err := pq.client.Batch(ctx, setWriteIndexOp, setItemOp); err != nil {
return err
}

pq.writeIndex = newIndex
// Inform the loop that there's some data to process
pq.putChan <- struct{}{}

return err
return nil
}

// getNextItem pulls the next available item from the persistent storage; if none is found, returns (nil, false)
Expand Down
6 changes: 6 additions & 0 deletions exporter/exporterhelper/internal/persistent_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,9 @@ func TestPersistentQueue_StorageFull(t *testing.T) {
reqCount++
}

// Check that the size is correct
require.Equal(t, reqCount, ps.Size(), "Size must be equal to the number of items inserted")

// Manually set the storage to only have a small amount of free space left
newMaxSize := client.GetSizeInBytes() + freeSpaceInBytes
client.SetMaxSizeInBytes(newMaxSize)
Expand All @@ -632,6 +635,9 @@ func TestPersistentQueue_StorageFull(t *testing.T) {
require.Error(t, ps.Offer(context.Background(), req))

// Take out all the items
// Getting the first item fails, as we can't update the state in storage, so we just delete it without returning it
// Subsequent items succeed, as deleting the first item frees enough space for the state update
reqCount--
for i := reqCount; i > 0; i-- {
request, found := ps.Poll()
require.True(t, found)
Expand Down

0 comments on commit 725821b

Please sign in to comment.