diff --git a/query/stdlib/universe/last_test.flux b/query/stdlib/universe/last_test.flux index f4e0d731042..3bbdab42b18 100644 --- a/query/stdlib/universe/last_test.flux +++ b/query/stdlib/universe/last_test.flux @@ -48,7 +48,7 @@ testcase last_multi_shard { ) result = csv.from(csv: input) |> testing.load() - |> range(start: -3y) + |> range(start: 2015-01-01T00:00:00Z) |> filter(fn: (r) => r._measurement == "pge_bill" and r._field == "bank") |> last() |> keep(columns: ["_time", "_value", "_field", "_measurement"]) diff --git a/scripts/ci/perf-tests/iot.yaml b/scripts/ci/perf-tests/iot.yaml index a20e8d63384..c2745912104 100644 --- a/scripts/ci/perf-tests/iot.yaml +++ b/scripts/ci/perf-tests/iot.yaml @@ -1,5 +1,5 @@ --- -name: iot +name: iot start_time: "2018-01-01T00:00:00Z" end_time: "2018-01-01T12:00:00Z" data: @@ -35,7 +35,7 @@ query_tests: - {"type": "build_query_file", "format": "http", "use_case": "ungrouped-agg", "query_type": "count"} - {"type": "build_query_file", "format": "http", "use_case": "ungrouped-agg", "query_type": "sum"} - - {"type": "build_query_file", "format": "http", "use_case": "iot", "query_type": "fast-query-small-data"} + # - {"type": "build_query_file", "format": "http", "use_case": "iot", "query_type": "fast-query-small-data"} - {"type": "build_query_file", "format": "http", "use_case": "iot", "query_type": "standalone-filter"} - {"type": "build_query_file", "format": "http", "use_case": "iot", "query_type": "aggregate-keep"} - {"type": "build_query_file", "format": "http", "use_case": "iot", "query_type": "aggregate-drop"} @@ -79,7 +79,7 @@ query_tests: - {"type": "build_query_file", "format": "flux-http", "use_case": "ungrouped-agg", "query_type": "count"} - {"type": "build_query_file", "format": "flux-http", "use_case": "ungrouped-agg", "query_type": "sum"} - - {"type": "build_query_file", "format": "flux-http", "use_case": "iot", "query_type": "fast-query-small-data"} + # - {"type": "build_query_file", "format": "flux-http", "use_case": "iot", "query_type": "fast-query-small-data"} - {"type": "build_query_file", "format": "flux-http", "use_case": "iot", "query_type": "standalone-filter"} - {"type": "build_query_file", "format": "flux-http", "use_case": "iot", "query_type": "aggregate-keep"} - {"type": "build_query_file", "format": "flux-http", "use_case": "iot", "query_type": "aggregate-drop"} diff --git a/scripts/ci/run_perftest.sh b/scripts/ci/run_perftest.sh index d52631974c6..ca1c86a8d90 100755 --- a/scripts/ci/run_perftest.sh +++ b/scripts/ci/run_perftest.sh @@ -27,8 +27,9 @@ install_influxdb() { install_telegraf() { # Install Telegraf - wget -qO- https://repos.influxdata.com/influxdb.key | apt-key add - - echo "deb https://repos.influxdata.com/ubuntu focal stable" | tee /etc/apt/sources.list.d/influxdb.list + curl -fLO https://repos.influxdata.com/influxdata-archive_compat.key + echo '393e8779c89ac8d958f81f942f9ad7fb82a25e133faddaf92e15b16e6ac9ce4c influxdata-archive_compat.key' | sha256sum -c && cat influxdata-archive_compat.key | gpg --dearmor | sudo tee /etc/apt/trusted.gpg.d/influxdata-archive_compat.gpg > /dev/null + echo 'deb [signed-by=/etc/apt/trusted.gpg.d/influxdata-archive_compat.gpg] https://repos.influxdata.com/debian stable main' | sudo tee /etc/apt/sources.list.d/influxdata.list DEBIAN_FRONTEND=noninteractive apt-get update DEBIAN_FRONTEND=noninteractive apt-get install -y git jq telegraf awscli @@ -100,7 +101,7 @@ EOF install_go() { # install golang latest version - go_endpoint="go1.17.3.linux-amd64.tar.gz" + go_endpoint="go1.17.11.linux-amd64.tar.gz" wget "https://dl.google.com/go/$go_endpoint" -O "$working_dir/$go_endpoint" rm -rf /usr/local/go @@ -118,13 +119,12 @@ install_go() { install_go_bins() { # install influxdb-comparisons cmds - go get \ - github.com/influxdata/influxdb-comparisons/cmd/bulk_data_gen \ - github.com/influxdata/influxdb-comparisons/cmd/bulk_load_influx \ - github.com/influxdata/influxdb-comparisons/cmd/bulk_query_gen \ - github.com/influxdata/influxdb-comparisons/cmd/query_benchmarker_influxdb + go install github.com/influxdata/influxdb-comparisons/cmd/bulk_data_gen@latest + go install github.com/influxdata/influxdb-comparisons/cmd/bulk_load_influx@latest + go install github.com/influxdata/influxdb-comparisons/cmd/bulk_query_gen@latest + go install github.com/influxdata/influxdb-comparisons/cmd/query_benchmarker_influxdb@latest # install yq - go get github.com/mikefarah/yq/v4 + go install github.com/mikefarah/yq/v4@v4.23.1 } # Helper functions containing common logic diff --git a/tsdb/index/tsi1/partition.go b/tsdb/index/tsi1/partition.go index 687c0be8d81..0d3a76d3da6 100644 --- a/tsdb/index/tsi1/partition.go +++ b/tsdb/index/tsi1/partition.go @@ -40,7 +40,8 @@ const ManifestFileName = "MANIFEST" // Partition represents a collection of layered index files and WAL. type Partition struct { - mu sync.RWMutex + // exported for tests + Mu sync.RWMutex opened bool sfile *tsdb.SeriesFile // series lookup file @@ -150,8 +151,8 @@ var ErrIncompatibleVersion = errors.New("incompatible tsi1 index MANIFEST") // Open opens the partition. func (p *Partition) Open() (rErr error) { - p.mu.Lock() - defer p.mu.Unlock() + p.Mu.Lock() + defer p.Mu.Unlock() p.closing = make(chan struct{}) @@ -339,8 +340,8 @@ func (p *Partition) buildSeriesSet() error { // CurrentCompactionN returns the number of compactions currently running. func (p *Partition) CurrentCompactionN() int { - p.mu.RLock() - defer p.mu.RUnlock() + p.Mu.RLock() + defer p.Mu.RUnlock() return p.currentCompactionN } @@ -367,8 +368,8 @@ func (p *Partition) Close() error { p.Wait() // Lock index and close remaining - p.mu.Lock() - defer p.mu.Unlock() + p.Mu.Lock() + defer p.Mu.Unlock() var err error @@ -402,8 +403,8 @@ func (p *Partition) SeriesFile() *tsdb.SeriesFile { return p.sfile } // NextSequence returns the next file identifier. func (p *Partition) NextSequence() int { - p.mu.Lock() - defer p.mu.Unlock() + p.Mu.Lock() + defer p.Mu.Unlock() return p.nextSequence() } @@ -445,8 +446,8 @@ func (p *Partition) manifest(newFileSet *FileSet) *Manifest { // SetManifestPathForTest is only to force a bad path in testing func (p *Partition) SetManifestPathForTest(path string) { - p.mu.Lock() - defer p.mu.Unlock() + p.Mu.Lock() + defer p.Mu.Unlock() p.manifestPathFn = func() string { return path } } @@ -457,16 +458,16 @@ func (p *Partition) WithLogger(logger *zap.Logger) { // SetFieldSet sets a shared field set from the engine. func (p *Partition) SetFieldSet(fs *tsdb.MeasurementFieldSet) { - p.mu.Lock() + p.Mu.Lock() p.fieldset = fs - p.mu.Unlock() + p.Mu.Unlock() } // FieldSet returns the fieldset. func (p *Partition) FieldSet() *tsdb.MeasurementFieldSet { - p.mu.Lock() + p.Mu.Lock() fs := p.fieldset - p.mu.Unlock() + p.Mu.Unlock() return fs } @@ -476,8 +477,8 @@ func (p *Partition) RetainFileSet() (*FileSet, error) { case <-p.closing: return nil, tsdb.ErrIndexClosing default: - p.mu.RLock() - defer p.mu.RUnlock() + p.Mu.RLock() + defer p.Mu.RUnlock() return p.retainFileSet(), nil } } @@ -490,8 +491,8 @@ func (p *Partition) retainFileSet() *FileSet { // FileN returns the active files in the file set. func (p *Partition) FileN() int { - p.mu.RLock() - defer p.mu.RUnlock() + p.Mu.RLock() + defer p.Mu.RUnlock() return len(p.fileSet.files) } @@ -637,8 +638,8 @@ func (p *Partition) DropMeasurement(name []byte) error { // Delete key if not already deleted. if !k.Deleted() { if err := func() error { - p.mu.RLock() - defer p.mu.RUnlock() + p.Mu.RLock() + defer p.Mu.RUnlock() return p.activeLogFile.DeleteTagKey(name, k.Key()) }(); err != nil { return err @@ -650,8 +651,8 @@ func (p *Partition) DropMeasurement(name []byte) error { for v := vitr.Next(); v != nil; v = vitr.Next() { if !v.Deleted() { if err := func() error { - p.mu.RLock() - defer p.mu.RUnlock() + p.Mu.RLock() + defer p.Mu.RUnlock() return p.activeLogFile.DeleteTagValue(name, k.Key(), v.Value()) }(); err != nil { return err @@ -673,8 +674,8 @@ func (p *Partition) DropMeasurement(name []byte) error { break } if err := func() error { - p.mu.RLock() - defer p.mu.RUnlock() + p.Mu.RLock() + defer p.Mu.RUnlock() return p.activeLogFile.DeleteSeriesID(elem.SeriesID) }(); err != nil { return err @@ -687,8 +688,8 @@ func (p *Partition) DropMeasurement(name []byte) error { // Mark measurement as deleted. if err := func() error { - p.mu.RLock() - defer p.mu.RUnlock() + p.Mu.RLock() + defer p.Mu.RUnlock() return p.activeLogFile.DeleteMeasurement(name) }(); err != nil { return err @@ -720,14 +721,14 @@ func (p *Partition) createSeriesListIfNotExists(names [][]byte, tagsSlice []mode defer fs.Release() // Ensure fileset cannot change during insert. - p.mu.RLock() + p.Mu.RLock() // Insert series into log file. ids, err := p.activeLogFile.AddSeriesList(p.seriesIDSet, names, tagsSlice) if err != nil { - p.mu.RUnlock() + p.Mu.RUnlock() return nil, err } - p.mu.RUnlock() + p.Mu.RUnlock() if err := p.CheckLogFile(); err != nil { return nil, err @@ -738,8 +739,8 @@ func (p *Partition) createSeriesListIfNotExists(names [][]byte, tagsSlice []mode func (p *Partition) DropSeries(seriesID uint64) error { // Delete series from index. if err := func() error { - p.mu.RLock() - defer p.mu.RUnlock() + p.Mu.RLock() + defer p.Mu.RUnlock() return p.activeLogFile.DeleteSeriesID(seriesID) }(); err != nil { return err @@ -904,14 +905,14 @@ func (p *Partition) AssignShard(k string, shardID uint64) {} // Compact requests a compaction of log files. func (p *Partition) Compact() { - p.mu.Lock() - defer p.mu.Unlock() + p.Mu.Lock() + defer p.Mu.Unlock() p.compact() } func (p *Partition) DisableCompactions() { - p.mu.Lock() - defer p.mu.Unlock() + p.Mu.Lock() + defer p.Mu.Unlock() p.compactionsDisabled++ select { @@ -927,8 +928,8 @@ func (p *Partition) DisableCompactions() { } func (p *Partition) EnableCompactions() { - p.mu.Lock() - defer p.mu.Unlock() + p.Mu.Lock() + defer p.Mu.Unlock() // Already enabled? if p.compactionsEnabled() { @@ -946,9 +947,9 @@ func (p *Partition) runPeriodicCompaction() { p.Compact() // Avoid a race when using Reopen in tests - p.mu.RLock() + p.Mu.RLock() closing := p.closing - p.mu.RUnlock() + p.Mu.RUnlock() // check for compactions once an hour (usually not necessary but a nice safety check) t := time.NewTicker(1 * time.Hour) @@ -971,8 +972,8 @@ func (p *Partition) runPeriodicCompaction() { // If checkRunning = true, only count as needing a compaction if there is not a compaction already // in progress for the level that would be compacted func (p *Partition) NeedsCompaction(checkRunning bool) bool { - p.mu.RLock() - defer p.mu.RUnlock() + p.Mu.RLock() + defer p.Mu.RUnlock() if p.needsLogCompaction() { return true } @@ -1025,10 +1026,10 @@ func (p *Partition) compact() { p.currentCompactionN++ go func() { p.compactLogFile(logFile) - p.mu.Lock() + p.Mu.Lock() p.currentCompactionN-- p.levelCompacting[0] = false - p.mu.Unlock() + p.Mu.Unlock() p.Compact() }() } @@ -1068,10 +1069,10 @@ func (p *Partition) compact() { p.compactToLevel(files, level+1, interrupt) // Ensure compaction lock for the level is released. - p.mu.Lock() + p.Mu.Lock() p.levelCompacting[level] = false p.currentCompactionN-- - p.mu.Unlock() + p.Mu.Unlock() // Check for new compactions p.Compact() @@ -1149,8 +1150,8 @@ func (p *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch // Obtain lock to swap in index file and write manifest. if err := func() (rErr error) { - p.mu.Lock() - defer p.mu.Unlock() + p.Mu.Lock() + defer p.Mu.Unlock() // Replace previous files with new index file. newFileSet := p.fileSet.MustReplace(IndexFiles(files).Files(), file) @@ -1214,8 +1215,8 @@ func (p *Partition) needsLogCompaction() bool { func (p *Partition) CheckLogFile() error { // Check log file under read lock. needsCompaction := func() bool { - p.mu.RLock() - defer p.mu.RUnlock() + p.Mu.RLock() + defer p.Mu.RUnlock() return p.needsLogCompaction() }() if !needsCompaction { @@ -1223,8 +1224,8 @@ func (p *Partition) CheckLogFile() error { } // If file size exceeded then recheck under write lock and swap files. - p.mu.Lock() - defer p.mu.Unlock() + p.Mu.Lock() + defer p.Mu.Unlock() return p.checkLogFile() } @@ -1254,9 +1255,9 @@ func (p *Partition) compactLogFile(logFile *LogFile) { return } - p.mu.Lock() + p.Mu.Lock() interrupt := p.compactionInterrupt - p.mu.Unlock() + p.Mu.Unlock() start := time.Now() @@ -1306,8 +1307,8 @@ func (p *Partition) compactLogFile(logFile *LogFile) { // Obtain lock to swap in index file and write manifest. if err := func() (rErr error) { - p.mu.Lock() - defer p.mu.Unlock() + p.Mu.Lock() + defer p.Mu.Unlock() // Replace previous log file with index file. newFileSet := p.fileSet.MustReplace([]File{logFile}, file) diff --git a/tsdb/index/tsi1/partition_test.go b/tsdb/index/tsi1/partition_test.go index 0c7c2064533..ae703b109e0 100644 --- a/tsdb/index/tsi1/partition_test.go +++ b/tsdb/index/tsi1/partition_test.go @@ -132,7 +132,9 @@ func TestPartition_Compact_Write_Fail(t *testing.T) { t.Fatalf("error closing partition: %v", err) } }() + p.Partition.Mu.Lock() p.Partition.MaxLogFileSize = -1 + p.Partition.Mu.Unlock() fileN := p.FileN() p.Compact() if (1 + fileN) != p.FileN() {