Skip to content

Commit

Permalink
fix: update broken flux and perf tests (2.6) (#24618)
Browse files Browse the repository at this point in the history
* fix: update broken flux and perf tests (main-2.x) (#24617)

* chore: download repository key to file

* fix: broken perf tests

Some perf tests had to be temporarily disabled. Work is
needed in the pref_tests repositories to make them work
again.

* fix(tsi1/partition/test): fix data race in test code (#24613)

* fix(tsi1/partition/test): fix data race in test code

TestPartition_Compact_Write_Fail test was not locking the partition
before changing the value of MaxLogFileSize. This PR exports the mutex
of the partition to allow the test to access it and lock. Alternatives
require more changes such as a Setter method if we need to hide the
mutex.

* fixes #24042, for #24040

* chore: complete renaming of mutex in file and fix flux test

The flux test is another failing test because it was using a relative
time range.

---------

Co-authored-by: Phil Bracikowski <[email protected]>
  • Loading branch information
bnpfeife and philjb authored Jan 31, 2024
1 parent c0b135f commit bfac0a6
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 69 deletions.
2 changes: 1 addition & 1 deletion query/stdlib/universe/last_test.flux
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ testcase last_multi_shard {
)
result = csv.from(csv: input)
|> testing.load()
|> range(start: -3y)
|> range(start: 2015-01-01T00:00:00Z)
|> filter(fn: (r) => r._measurement == "pge_bill" and r._field == "bank")
|> last()
|> keep(columns: ["_time", "_value", "_field", "_measurement"])
Expand Down
6 changes: 3 additions & 3 deletions scripts/ci/perf-tests/iot.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
name: iot
name: iot
start_time: "2018-01-01T00:00:00Z"
end_time: "2018-01-01T12:00:00Z"
data:
Expand Down Expand Up @@ -35,7 +35,7 @@ query_tests:
- {"type": "build_query_file", "format": "http", "use_case": "ungrouped-agg", "query_type": "count"}
- {"type": "build_query_file", "format": "http", "use_case": "ungrouped-agg", "query_type": "sum"}

- {"type": "build_query_file", "format": "http", "use_case": "iot", "query_type": "fast-query-small-data"}
# - {"type": "build_query_file", "format": "http", "use_case": "iot", "query_type": "fast-query-small-data"}
- {"type": "build_query_file", "format": "http", "use_case": "iot", "query_type": "standalone-filter"}
- {"type": "build_query_file", "format": "http", "use_case": "iot", "query_type": "aggregate-keep"}
- {"type": "build_query_file", "format": "http", "use_case": "iot", "query_type": "aggregate-drop"}
Expand Down Expand Up @@ -79,7 +79,7 @@ query_tests:
- {"type": "build_query_file", "format": "flux-http", "use_case": "ungrouped-agg", "query_type": "count"}
- {"type": "build_query_file", "format": "flux-http", "use_case": "ungrouped-agg", "query_type": "sum"}

- {"type": "build_query_file", "format": "flux-http", "use_case": "iot", "query_type": "fast-query-small-data"}
# - {"type": "build_query_file", "format": "flux-http", "use_case": "iot", "query_type": "fast-query-small-data"}
- {"type": "build_query_file", "format": "flux-http", "use_case": "iot", "query_type": "standalone-filter"}
- {"type": "build_query_file", "format": "flux-http", "use_case": "iot", "query_type": "aggregate-keep"}
- {"type": "build_query_file", "format": "flux-http", "use_case": "iot", "query_type": "aggregate-drop"}
Expand Down
18 changes: 9 additions & 9 deletions scripts/ci/run_perftest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ install_influxdb() {

install_telegraf() {
# Install Telegraf
wget -qO- https://repos.influxdata.com/influxdb.key | apt-key add -
echo "deb https://repos.influxdata.com/ubuntu focal stable" | tee /etc/apt/sources.list.d/influxdb.list
curl -fLO https://repos.influxdata.com/influxdata-archive_compat.key
echo '393e8779c89ac8d958f81f942f9ad7fb82a25e133faddaf92e15b16e6ac9ce4c influxdata-archive_compat.key' | sha256sum -c && cat influxdata-archive_compat.key | gpg --dearmor | sudo tee /etc/apt/trusted.gpg.d/influxdata-archive_compat.gpg > /dev/null
echo 'deb [signed-by=/etc/apt/trusted.gpg.d/influxdata-archive_compat.gpg] https://repos.influxdata.com/debian stable main' | sudo tee /etc/apt/sources.list.d/influxdata.list

DEBIAN_FRONTEND=noninteractive apt-get update
DEBIAN_FRONTEND=noninteractive apt-get install -y git jq telegraf awscli
Expand Down Expand Up @@ -100,7 +101,7 @@ EOF

install_go() {
# install golang latest version
go_endpoint="go1.17.3.linux-amd64.tar.gz"
go_endpoint="go1.17.11.linux-amd64.tar.gz"

wget "https://dl.google.com/go/$go_endpoint" -O "$working_dir/$go_endpoint"
rm -rf /usr/local/go
Expand All @@ -118,13 +119,12 @@ install_go() {

install_go_bins() {
# install influxdb-comparisons cmds
go get \
github.com/influxdata/influxdb-comparisons/cmd/bulk_data_gen \
github.com/influxdata/influxdb-comparisons/cmd/bulk_load_influx \
github.com/influxdata/influxdb-comparisons/cmd/bulk_query_gen \
github.com/influxdata/influxdb-comparisons/cmd/query_benchmarker_influxdb
go install github.com/influxdata/influxdb-comparisons/cmd/bulk_data_gen@latest
go install github.com/influxdata/influxdb-comparisons/cmd/bulk_load_influx@latest
go install github.com/influxdata/influxdb-comparisons/cmd/bulk_query_gen@latest
go install github.com/influxdata/influxdb-comparisons/cmd/query_benchmarker_influxdb@latest
# install yq
go get github.com/mikefarah/yq/v4
go install github.com/mikefarah/yq/v4@v4.23.1
}

# Helper functions containing common logic
Expand Down
113 changes: 57 additions & 56 deletions tsdb/index/tsi1/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ const ManifestFileName = "MANIFEST"

// Partition represents a collection of layered index files and WAL.
type Partition struct {
mu sync.RWMutex
// exported for tests
Mu sync.RWMutex
opened bool

sfile *tsdb.SeriesFile // series lookup file
Expand Down Expand Up @@ -150,8 +151,8 @@ var ErrIncompatibleVersion = errors.New("incompatible tsi1 index MANIFEST")

// Open opens the partition.
func (p *Partition) Open() (rErr error) {
p.mu.Lock()
defer p.mu.Unlock()
p.Mu.Lock()
defer p.Mu.Unlock()

p.closing = make(chan struct{})

Expand Down Expand Up @@ -339,8 +340,8 @@ func (p *Partition) buildSeriesSet() error {

// CurrentCompactionN returns the number of compactions currently running.
func (p *Partition) CurrentCompactionN() int {
p.mu.RLock()
defer p.mu.RUnlock()
p.Mu.RLock()
defer p.Mu.RUnlock()
return p.currentCompactionN
}

Expand All @@ -367,8 +368,8 @@ func (p *Partition) Close() error {
p.Wait()

// Lock index and close remaining
p.mu.Lock()
defer p.mu.Unlock()
p.Mu.Lock()
defer p.Mu.Unlock()

var err error

Expand Down Expand Up @@ -402,8 +403,8 @@ func (p *Partition) SeriesFile() *tsdb.SeriesFile { return p.sfile }

// NextSequence returns the next file identifier.
func (p *Partition) NextSequence() int {
p.mu.Lock()
defer p.mu.Unlock()
p.Mu.Lock()
defer p.Mu.Unlock()
return p.nextSequence()
}

Expand Down Expand Up @@ -445,8 +446,8 @@ func (p *Partition) manifest(newFileSet *FileSet) *Manifest {

// SetManifestPathForTest is only to force a bad path in testing
func (p *Partition) SetManifestPathForTest(path string) {
p.mu.Lock()
defer p.mu.Unlock()
p.Mu.Lock()
defer p.Mu.Unlock()
p.manifestPathFn = func() string { return path }
}

Expand All @@ -457,16 +458,16 @@ func (p *Partition) WithLogger(logger *zap.Logger) {

// SetFieldSet sets a shared field set from the engine.
func (p *Partition) SetFieldSet(fs *tsdb.MeasurementFieldSet) {
p.mu.Lock()
p.Mu.Lock()
p.fieldset = fs
p.mu.Unlock()
p.Mu.Unlock()
}

// FieldSet returns the fieldset.
func (p *Partition) FieldSet() *tsdb.MeasurementFieldSet {
p.mu.Lock()
p.Mu.Lock()
fs := p.fieldset
p.mu.Unlock()
p.Mu.Unlock()
return fs
}

Expand All @@ -476,8 +477,8 @@ func (p *Partition) RetainFileSet() (*FileSet, error) {
case <-p.closing:
return nil, tsdb.ErrIndexClosing
default:
p.mu.RLock()
defer p.mu.RUnlock()
p.Mu.RLock()
defer p.Mu.RUnlock()
return p.retainFileSet(), nil
}
}
Expand All @@ -490,8 +491,8 @@ func (p *Partition) retainFileSet() *FileSet {

// FileN returns the active files in the file set.
func (p *Partition) FileN() int {
p.mu.RLock()
defer p.mu.RUnlock()
p.Mu.RLock()
defer p.Mu.RUnlock()
return len(p.fileSet.files)
}

Expand Down Expand Up @@ -637,8 +638,8 @@ func (p *Partition) DropMeasurement(name []byte) error {
// Delete key if not already deleted.
if !k.Deleted() {
if err := func() error {
p.mu.RLock()
defer p.mu.RUnlock()
p.Mu.RLock()
defer p.Mu.RUnlock()
return p.activeLogFile.DeleteTagKey(name, k.Key())
}(); err != nil {
return err
Expand All @@ -650,8 +651,8 @@ func (p *Partition) DropMeasurement(name []byte) error {
for v := vitr.Next(); v != nil; v = vitr.Next() {
if !v.Deleted() {
if err := func() error {
p.mu.RLock()
defer p.mu.RUnlock()
p.Mu.RLock()
defer p.Mu.RUnlock()
return p.activeLogFile.DeleteTagValue(name, k.Key(), v.Value())
}(); err != nil {
return err
Expand All @@ -673,8 +674,8 @@ func (p *Partition) DropMeasurement(name []byte) error {
break
}
if err := func() error {
p.mu.RLock()
defer p.mu.RUnlock()
p.Mu.RLock()
defer p.Mu.RUnlock()
return p.activeLogFile.DeleteSeriesID(elem.SeriesID)
}(); err != nil {
return err
Expand All @@ -687,8 +688,8 @@ func (p *Partition) DropMeasurement(name []byte) error {

// Mark measurement as deleted.
if err := func() error {
p.mu.RLock()
defer p.mu.RUnlock()
p.Mu.RLock()
defer p.Mu.RUnlock()
return p.activeLogFile.DeleteMeasurement(name)
}(); err != nil {
return err
Expand Down Expand Up @@ -720,14 +721,14 @@ func (p *Partition) createSeriesListIfNotExists(names [][]byte, tagsSlice []mode
defer fs.Release()

// Ensure fileset cannot change during insert.
p.mu.RLock()
p.Mu.RLock()
// Insert series into log file.
ids, err := p.activeLogFile.AddSeriesList(p.seriesIDSet, names, tagsSlice)
if err != nil {
p.mu.RUnlock()
p.Mu.RUnlock()
return nil, err
}
p.mu.RUnlock()
p.Mu.RUnlock()

if err := p.CheckLogFile(); err != nil {
return nil, err
Expand All @@ -738,8 +739,8 @@ func (p *Partition) createSeriesListIfNotExists(names [][]byte, tagsSlice []mode
func (p *Partition) DropSeries(seriesID uint64) error {
// Delete series from index.
if err := func() error {
p.mu.RLock()
defer p.mu.RUnlock()
p.Mu.RLock()
defer p.Mu.RUnlock()
return p.activeLogFile.DeleteSeriesID(seriesID)
}(); err != nil {
return err
Expand Down Expand Up @@ -904,14 +905,14 @@ func (p *Partition) AssignShard(k string, shardID uint64) {}

// Compact requests a compaction of log files.
func (p *Partition) Compact() {
p.mu.Lock()
defer p.mu.Unlock()
p.Mu.Lock()
defer p.Mu.Unlock()
p.compact()
}

func (p *Partition) DisableCompactions() {
p.mu.Lock()
defer p.mu.Unlock()
p.Mu.Lock()
defer p.Mu.Unlock()
p.compactionsDisabled++

select {
Expand All @@ -927,8 +928,8 @@ func (p *Partition) DisableCompactions() {
}

func (p *Partition) EnableCompactions() {
p.mu.Lock()
defer p.mu.Unlock()
p.Mu.Lock()
defer p.Mu.Unlock()

// Already enabled?
if p.compactionsEnabled() {
Expand All @@ -946,9 +947,9 @@ func (p *Partition) runPeriodicCompaction() {
p.Compact()

// Avoid a race when using Reopen in tests
p.mu.RLock()
p.Mu.RLock()
closing := p.closing
p.mu.RUnlock()
p.Mu.RUnlock()

// check for compactions once an hour (usually not necessary but a nice safety check)
t := time.NewTicker(1 * time.Hour)
Expand All @@ -971,8 +972,8 @@ func (p *Partition) runPeriodicCompaction() {
// If checkRunning = true, only count as needing a compaction if there is not a compaction already
// in progress for the level that would be compacted
func (p *Partition) NeedsCompaction(checkRunning bool) bool {
p.mu.RLock()
defer p.mu.RUnlock()
p.Mu.RLock()
defer p.Mu.RUnlock()
if p.needsLogCompaction() {
return true
}
Expand Down Expand Up @@ -1025,10 +1026,10 @@ func (p *Partition) compact() {
p.currentCompactionN++
go func() {
p.compactLogFile(logFile)
p.mu.Lock()
p.Mu.Lock()
p.currentCompactionN--
p.levelCompacting[0] = false
p.mu.Unlock()
p.Mu.Unlock()
p.Compact()
}()
}
Expand Down Expand Up @@ -1068,10 +1069,10 @@ func (p *Partition) compact() {
p.compactToLevel(files, level+1, interrupt)

// Ensure compaction lock for the level is released.
p.mu.Lock()
p.Mu.Lock()
p.levelCompacting[level] = false
p.currentCompactionN--
p.mu.Unlock()
p.Mu.Unlock()

// Check for new compactions
p.Compact()
Expand Down Expand Up @@ -1149,8 +1150,8 @@ func (p *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch

// Obtain lock to swap in index file and write manifest.
if err := func() (rErr error) {
p.mu.Lock()
defer p.mu.Unlock()
p.Mu.Lock()
defer p.Mu.Unlock()

// Replace previous files with new index file.
newFileSet := p.fileSet.MustReplace(IndexFiles(files).Files(), file)
Expand Down Expand Up @@ -1214,17 +1215,17 @@ func (p *Partition) needsLogCompaction() bool {
func (p *Partition) CheckLogFile() error {
// Check log file under read lock.
needsCompaction := func() bool {
p.mu.RLock()
defer p.mu.RUnlock()
p.Mu.RLock()
defer p.Mu.RUnlock()
return p.needsLogCompaction()
}()
if !needsCompaction {
return nil
}

// If file size exceeded then recheck under write lock and swap files.
p.mu.Lock()
defer p.mu.Unlock()
p.Mu.Lock()
defer p.Mu.Unlock()
return p.checkLogFile()
}

Expand Down Expand Up @@ -1254,9 +1255,9 @@ func (p *Partition) compactLogFile(logFile *LogFile) {
return
}

p.mu.Lock()
p.Mu.Lock()
interrupt := p.compactionInterrupt
p.mu.Unlock()
p.Mu.Unlock()

start := time.Now()

Expand Down Expand Up @@ -1306,8 +1307,8 @@ func (p *Partition) compactLogFile(logFile *LogFile) {

// Obtain lock to swap in index file and write manifest.
if err := func() (rErr error) {
p.mu.Lock()
defer p.mu.Unlock()
p.Mu.Lock()
defer p.Mu.Unlock()

// Replace previous log file with index file.
newFileSet := p.fileSet.MustReplace([]File{logFile}, file)
Expand Down
2 changes: 2 additions & 0 deletions tsdb/index/tsi1/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ func TestPartition_Compact_Write_Fail(t *testing.T) {
t.Fatalf("error closing partition: %v", err)
}
}()
p.Partition.Mu.Lock()
p.Partition.MaxLogFileSize = -1
p.Partition.Mu.Unlock()
fileN := p.FileN()
p.Compact()
if (1 + fileN) != p.FileN() {
Expand Down

0 comments on commit bfac0a6

Please sign in to comment.