From 63bde7ed5045f33c977e4fb4ad08c36e82bd3f02 Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Tue, 4 Apr 2023 12:46:31 -0400 Subject: [PATCH 1/7] Add a test to reproduce the race condition --- agent/consul/state/memdb_test.go | 95 ++++++++++++++++++++++ agent/consul/state/mock_publishFuncType.go | 39 +++++++++ 2 files changed, 134 insertions(+) create mode 100644 agent/consul/state/memdb_test.go create mode 100644 agent/consul/state/mock_publishFuncType.go diff --git a/agent/consul/state/memdb_test.go b/agent/consul/state/memdb_test.go new file mode 100644 index 000000000000..8398afbf9450 --- /dev/null +++ b/agent/consul/state/memdb_test.go @@ -0,0 +1,95 @@ +package state + +import ( + "fmt" + "github.com/hashicorp/go-memdb" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + "testing" + "time" +) + +func testValidSchema() *memdb.DBSchema { + return &memdb.DBSchema{ + Tables: map[string]*memdb.TableSchema{ + "main": { + Name: "main", + Indexes: map[string]*memdb.IndexSchema{ + "id": { + Name: "id", + Unique: true, + Indexer: &memdb.StringFieldIndex{Field: "ID"}, + }, + "foo": { + Name: "foo", + Indexer: &memdb.StringFieldIndex{Field: "Foo"}, + }, + }, + }, + }, + } +} + +type TestObject struct { + ID string + Foo string +} + +func Test_txn_Commit(t *testing.T) { + db, err := memdb.NewMemDB(testValidSchema()) + require.NoError(t, err) + publishFunc := mockPublishFuncType{} + tx := txn{ + Txn: db.Txn(true), + Index: 0, + publish: publishFunc.Execute, + } + ch1 := make(chan struct{}) + ch2 := make(chan struct{}) + getCh := make(chan memdb.ResultIterator) + group := errgroup.Group{} + group.Go(func() error { + after := time.After(2 * time.Second) + select { + case <-ch1: + tx2 := txn{ + Txn: db.Txn(false), + Index: 0, + publish: publishFunc.Execute, + } + get, err := tx2.Get("main", "id") + if err != nil { + return err + } + close(ch2) + getCh <- get + case <-after: + close(ch2) + return fmt.Errorf("test timed out") + } + return nil + }) + + publishFunc.On("Execute", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + close(ch1) + <-ch2 + }).Return(nil) + + err = tx.Insert("main", TestObject{ID: "1", Foo: "foo"}) + require.NoError(t, err) + err = tx.Commit() + require.NoError(t, err) + get := <-getCh + require.NotNil(t, get) + next := get.Next() + require.NotNil(t, next) + + val := next.(TestObject) + require.Equal(t, val.ID, "1") + require.Equal(t, val.Foo, "foo") + + err = group.Wait() + require.NoError(t, err) + +} diff --git a/agent/consul/state/mock_publishFuncType.go b/agent/consul/state/mock_publishFuncType.go new file mode 100644 index 000000000000..e5923a1da347 --- /dev/null +++ b/agent/consul/state/mock_publishFuncType.go @@ -0,0 +1,39 @@ +// Code generated by mockery v2.20.0. DO NOT EDIT. + +package state + +import mock "github.com/stretchr/testify/mock" + +// mockPublishFuncType is an autogenerated mock type for the publishFuncType type +type mockPublishFuncType struct { + mock.Mock +} + +// Execute provides a mock function with given fields: tx, changes +func (_m *mockPublishFuncType) Execute(tx ReadTxn, changes Changes) error { + ret := _m.Called(tx, changes) + + var r0 error + if rf, ok := ret.Get(0).(func(ReadTxn, Changes) error); ok { + r0 = rf(tx, changes) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +type mockConstructorTestingTnewMockPublishFuncType interface { + mock.TestingT + Cleanup(func()) +} + +// newMockPublishFuncType creates a new instance of mockPublishFuncType. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func newMockPublishFuncType(t mockConstructorTestingTnewMockPublishFuncType) *mockPublishFuncType { + mock := &mockPublishFuncType{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} From b941d1f95151736905a102215acfa4894884251b Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Tue, 4 Apr 2023 12:47:27 -0400 Subject: [PATCH 2/7] Fix race condition by publishing the event after the commit and adding a lock to prevent out of order events. --- agent/consul/state/memdb.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index bd0e58b2d67c..3e572691c806 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -5,6 +5,7 @@ package state import ( "fmt" + "sync" "github.com/hashicorp/go-memdb" @@ -127,6 +128,9 @@ func (c *changeTrackerDB) WriteTxnRestore() *txn { return t } +//go:generate mockery --name publishFuncType --inpackage +type publishFuncType func(tx ReadTxn, changes Changes) error + // txn wraps a memdb.Txn to capture changes and send them to the EventPublisher. // // This can not be done with txn.Defer because the callback passed to Defer is @@ -140,7 +144,9 @@ type txn struct { // Index is stored so that it may be passed along to any subscribers as part // of a change event. Index uint64 - publish func(tx ReadTxn, changes Changes) error + publish publishFuncType + + commitLock sync.Mutex } // Commit first pushes changes to EventPublisher, then calls Commit on the @@ -161,6 +167,11 @@ func (tx *txn) Commit() error { } } + // This lock prevent concurrent commits to get published out of order. + tx.commitLock.Lock() + defer tx.commitLock.Unlock() + tx.Txn.Commit() + // publish may be nil if this is a read-only or WriteTxnRestore transaction. // In those cases changes should also be empty, and there will be nothing // to publish. @@ -169,8 +180,6 @@ func (tx *txn) Commit() error { return err } } - - tx.Txn.Commit() return nil } From 48fbb70a9e9eccd0da3262f32ce74ae5fa462ce3 Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Tue, 4 Apr 2023 14:25:52 -0400 Subject: [PATCH 3/7] split publish to generate the list of events before committing the transaction. --- agent/consul/state/memdb.go | 40 ++++++++++++++-------- agent/consul/state/mock_publishFuncType.go | 20 ++++------- 2 files changed, 32 insertions(+), 28 deletions(-) diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index 3e572691c806..6c5d49c9f359 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -4,7 +4,6 @@ package state import ( - "fmt" "sync" "github.com/hashicorp/go-memdb" @@ -94,21 +93,17 @@ func (c *changeTrackerDB) ReadTxn() *memdb.Txn { // data directly into the DB. These cases may use WriteTxnRestore. func (c *changeTrackerDB) WriteTxn(idx uint64) *txn { t := &txn{ - Txn: c.db.Txn(true), - Index: idx, - publish: c.publish, + Txn: c.db.Txn(true), + Index: idx, + publish: c.publish, + prePublish: c.processChanges, } t.Txn.TrackChanges() return t } -func (c *changeTrackerDB) publish(tx ReadTxn, changes Changes) error { - events, err := c.processChanges(tx, changes) - if err != nil { - return fmt.Errorf("failed generating events from changes: %v", err) - } +func (c *changeTrackerDB) publish(events []stream.Event) { c.publisher.Publish(events) - return nil } // WriteTxnRestore returns a wrapped RW transaction that should only be used in @@ -128,8 +123,10 @@ func (c *changeTrackerDB) WriteTxnRestore() *txn { return t } +type prePublishFuncType func(tx ReadTxn, changes Changes) ([]stream.Event, error) + //go:generate mockery --name publishFuncType --inpackage -type publishFuncType func(tx ReadTxn, changes Changes) error +type publishFuncType func(events []stream.Event) // txn wraps a memdb.Txn to capture changes and send them to the EventPublisher. // @@ -146,6 +143,8 @@ type txn struct { Index uint64 publish publishFuncType + prePublish prePublishFuncType + commitLock sync.Mutex } @@ -170,15 +169,26 @@ func (tx *txn) Commit() error { // This lock prevent concurrent commits to get published out of order. tx.commitLock.Lock() defer tx.commitLock.Unlock() + + var events []stream.Event + var err error + + // prePublish need to generate a list of events before the transaction is commited, + // as we loose the changes in the transaction after the call to Commit(). + if tx.prePublish != nil { + events, err = tx.prePublish(tx.Txn, changes) + if err != nil { + return err + } + } + tx.Txn.Commit() // publish may be nil if this is a read-only or WriteTxnRestore transaction. - // In those cases changes should also be empty, and there will be nothing + // In those cases events should also be empty, and there will be nothing // to publish. if tx.publish != nil { - if err := tx.publish(tx.Txn, changes); err != nil { - return err - } + tx.publish(events) } return nil } diff --git a/agent/consul/state/mock_publishFuncType.go b/agent/consul/state/mock_publishFuncType.go index e5923a1da347..bf1c6a5acb15 100644 --- a/agent/consul/state/mock_publishFuncType.go +++ b/agent/consul/state/mock_publishFuncType.go @@ -2,25 +2,19 @@ package state -import mock "github.com/stretchr/testify/mock" +import ( + stream "github.com/hashicorp/consul/agent/consul/stream" + mock "github.com/stretchr/testify/mock" +) // mockPublishFuncType is an autogenerated mock type for the publishFuncType type type mockPublishFuncType struct { mock.Mock } -// Execute provides a mock function with given fields: tx, changes -func (_m *mockPublishFuncType) Execute(tx ReadTxn, changes Changes) error { - ret := _m.Called(tx, changes) - - var r0 error - if rf, ok := ret.Get(0).(func(ReadTxn, Changes) error); ok { - r0 = rf(tx, changes) - } else { - r0 = ret.Error(0) - } - - return r0 +// Execute provides a mock function with given fields: events +func (_m *mockPublishFuncType) Execute(events []stream.Event) { + _m.Called(events) } type mockConstructorTestingTnewMockPublishFuncType interface { From 731c5ce468093ed7c02f07d4f19a144a8bd943bf Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Tue, 4 Apr 2023 14:51:05 -0400 Subject: [PATCH 4/7] add changelog --- .changelog/16871.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/16871.txt diff --git a/.changelog/16871.txt b/.changelog/16871.txt new file mode 100644 index 000000000000..c8874615a297 --- /dev/null +++ b/.changelog/16871.txt @@ -0,0 +1,3 @@ +```release-note:bug +Fix a race condition where an event is published before the data associated is commited to memdb.. +``` From de49cc5f28f3b438c222607f8c98207f751c8811 Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Wed, 5 Apr 2023 08:47:15 -0400 Subject: [PATCH 5/7] remove extra func --- agent/consul/state/memdb.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index 6c5d49c9f359..ad54ae6528ae 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -95,17 +95,13 @@ func (c *changeTrackerDB) WriteTxn(idx uint64) *txn { t := &txn{ Txn: c.db.Txn(true), Index: idx, - publish: c.publish, + publish: c.publisher.Publish, prePublish: c.processChanges, } t.Txn.TrackChanges() return t } -func (c *changeTrackerDB) publish(events []stream.Event) { - c.publisher.Publish(events) -} - // WriteTxnRestore returns a wrapped RW transaction that should only be used in // Restore where we need to replace the entire contents of the Store. // WriteTxnRestore uses a zero index since the whole restore doesn't really From 0e0f564697311d7926a1c723fae4120c26a0f3f1 Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Wed, 12 Apr 2023 08:56:52 -0400 Subject: [PATCH 6/7] Apply suggestions from code review Co-authored-by: Dan Upton --- .changelog/16871.txt | 2 +- agent/consul/state/memdb.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.changelog/16871.txt b/.changelog/16871.txt index c8874615a297..f1167c45006a 100644 --- a/.changelog/16871.txt +++ b/.changelog/16871.txt @@ -1,3 +1,3 @@ ```release-note:bug -Fix a race condition where an event is published before the data associated is commited to memdb.. +Fix a race condition where an event is published before the data associated is commited to memdb. ``` diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index ad54ae6528ae..4ef90f006cdf 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -162,7 +162,7 @@ func (tx *txn) Commit() error { } } - // This lock prevent concurrent commits to get published out of order. + // This lock prevents events from concurrent transactions getting published out of order. tx.commitLock.Lock() defer tx.commitLock.Unlock() From 0ea6d14e3e8a3beea8b549838814f12d0736f42c Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Wed, 12 Apr 2023 08:57:19 -0400 Subject: [PATCH 7/7] add comment to explain test --- agent/consul/state/memdb_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/agent/consul/state/memdb_test.go b/agent/consul/state/memdb_test.go index 8398afbf9450..32d4edf1b333 100644 --- a/agent/consul/state/memdb_test.go +++ b/agent/consul/state/memdb_test.go @@ -36,6 +36,9 @@ type TestObject struct { Foo string } +// This test verify that the new data in a TXN is commited at the time that publishFunc is called. +// To do so, the publish func is mocked, a read on ch1 means that publish is called and blocked, +// ch2 permit to control the publish func and unblock it when receiving a signal. func Test_txn_Commit(t *testing.T) { db, err := memdb.NewMemDB(testValidSchema()) require.NoError(t, err)