Skip to content

Commit

Permalink
Fix invalid write index updates in the persistent queue
Browse files Browse the repository at this point in the history
  • Loading branch information
swiatekm committed Nov 20, 2023
1 parent 109048d commit f6ae621
Showing 1 changed file with 33 additions and 13 deletions.
46 changes: 33 additions & 13 deletions exporter/exporterhelper/internal/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,20 +205,25 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
}

itemKey := getItemKey(pq.writeIndex)
pq.writeIndex++
newIndex := pq.writeIndex + 1

reqBuf, err := pq.marshaler(req)
if err != nil {
return err
}
err = pq.client.Batch(ctx,
storage.SetOperation(writeIndexKey, itemIndexToBytes(pq.writeIndex)),
storage.SetOperation(itemKey, reqBuf))

// Carry out a transaction where we both add the item and update the write index
setWriteIndexOp := storage.SetOperation(writeIndexKey, itemIndexToBytes(newIndex))
setItemOp := storage.SetOperation(itemKey, reqBuf)
if err := pq.client.Batch(ctx, setWriteIndexOp, setItemOp); err != nil {
return err
}

pq.writeIndex = newIndex
// Inform the loop that there's some data to process
pq.putChan <- struct{}{}

return err
return nil
}

// getNextItem pulls the next available item from the persistent storage; if none is found, returns (nil, false)
Expand All @@ -234,20 +239,35 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (QueueRequest[T],
if pq.readIndex == pq.writeIndex {
return QueueRequest[T]{}, false
}

// We need to do three storage operations here:
// * get the item from the database and unmarshal it
// * update the read index in the database
// * add the item index to the currently dispatched array in the database
// Each of those operations can fail and we need to handle the failures differently

index := pq.readIndex
// Increase here, so even if errors happen below, it always iterates
pq.readIndex++
itemKey := getItemKey(index)

// Update the in-memory state. This can cause an inconsistency if we fail to write these changes to storage.
// This is preferable to not being able to remove the item at all. In the majority of cases, these will be
// updated in the storage during the next get operation.
pq.readIndex++
pq.currentlyDispatchedItems = append(pq.currentlyDispatchedItems, index)
getOp := storage.GetOperation(getItemKey(index))
err := pq.client.Batch(ctx,
storage.SetOperation(readIndexKey, itemIndexToBytes(pq.readIndex)),
storage.SetOperation(currentlyDispatchedItemsKey, itemIndexArrayToBytes(pq.currentlyDispatchedItems)),
getOp)

// Update the state in storage. If this fails, we log the failure and continue.
if err := pq.client.Set(ctx, readIndexKey, itemIndexToBytes(pq.readIndex)); err != nil {
pq.set.Logger.Warn("Error updating read index, unable to get item from the queue", zap.Error(err), zap.Int("index", int(index)))
}
if err := pq.client.Set(ctx, currentlyDispatchedItemsKey, itemIndexArrayToBytes(pq.currentlyDispatchedItems)); err != nil {
pq.set.Logger.Warn("Error adding item to the dispatched list", zap.Error(err), zap.Int("index", int(index)))
}

// Get the item itself and unmarshal it if successful.
rawItem, err := pq.client.Get(ctx, itemKey)
var request T
if err == nil {
request, err = pq.unmarshaler(getOp.Value)
request, err = pq.unmarshaler(rawItem)
}

if err != nil {
Expand Down

0 comments on commit f6ae621

Please sign in to comment.