diff --git a/CHANGELOG.md b/CHANGELOG.md index 581146d97a8..5ff2f8330f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -75,6 +75,58 @@ Old config will still work but will be removed in a future release. [#1735](http * Upgrade `github.com/grafana/dskit` * Upgrade `github.com/grafana/e2e` * Upgrade `github.com/minio/minio-go/v7` +* [CHANGE] Config updates to prepare for Tempo 2.0. [#1978](https://github.com/grafana/tempo/pull/1978) (@joe-elliott) + Defaults updated: + ``` + query_frontend: + max_oustanding_per_tenant: 2000 + search: + concurrent_jobs: 1000 + target_bytes_per_job: 104857600 + max_duration: 168h + query_ingesters_until: 30m + trace_by_id: + query_shards: 50 + querier: + max_concurrent_queries: 20 + search: + prefer_self: 10 + ingester: + concurrent_flushes: 4 + max_block_duration: 30m + max_block_bytes: 524288000 + storage: + trace: + pool: + max_workers: 400 + queue_depth: 20000 + search: + read_buffer_count: 32 + read_buffer_size_bytes: 1048576 + ``` + **BREAKING CHANGE** Renamed/removed/moved + ``` + query_frontend: + query_shards: // removed. use trace_by_id.query_shards + querier: + query_timeout: // removed. use trace_by_id.query_timeout + compactor: + compaction: + chunk_size_bytes: // renamed to v2_in_buffer_bytes + flush_size_bytes: // renamed to v2_out_buffer_bytes + iterator_buffer_size: // renamed to v2_prefetch_traces_count + ingester: + use_flatbuffer_search: // removed. automatically set based on block type + storage: + wal: + encoding: // renamed to v2_encoding + version: // removed and pinned to block.version + block: + index_downsample_bytes: // renamed to v2_index_downsample_bytes + index_page_size_bytes: // renamed to v2_index_page_size_bytes + encoding: // renamed to v2_encoding + row_group_size_bytes: // renamed to parquet_row_group_size_bytes + ``` * [FEATURE] Add capability to configure the used S3 Storage Class [#1697](https://github.com/grafana/tempo/pull/1714) (@amitsetty) * [ENHANCEMENT] cache: expose username and sentinel_username redis configuration options for ACL-based Redis Auth support [#1708](https://github.com/grafana/tempo/pull/1708) (@jsievenpiper) * [ENHANCEMENT] metrics-generator: expose span size as a metric [#1662](https://github.com/grafana/tempo/pull/1662) (@ie-pham) diff --git a/cmd/tempo/app/config.go b/cmd/tempo/app/config.go index ed793b8ba76..2a729bf3c8f 100644 --- a/cmd/tempo/app/config.go +++ b/cmd/tempo/app/config.go @@ -20,7 +20,6 @@ import ( "github.com/grafana/tempo/pkg/usagestats" "github.com/grafana/tempo/pkg/util" "github.com/grafana/tempo/tempodb" - v2 "github.com/grafana/tempo/tempodb/encoding/v2" "github.com/prometheus/client_golang/prometheus" "github.com/weaveworks/common/server" ) @@ -156,14 +155,25 @@ func (c *Config) CheckConfig() []ConfigWarning { warnings = append(warnings, warnStorageTraceBackendLocal) } - // flatbuffers are configured but we're not using v2 - if c.Ingester.UseFlatbufferSearch && c.StorageConfig.Trace.Block.Version != v2.VersionString { - warnings = append(warnings, warnFlatBuffersNotNecessary) + // check v2 specific settings + if c.StorageConfig.Trace.Block.Version != "v2" && c.StorageConfig.Trace.Block.IndexDownsampleBytes != storage.DefaultIndexDownSampleBytes { + warnings = append(warnings, newV2Warning("v2_index_downsample_bytes")) } - // we're using v2 but flatbuffers are not configured - if !c.Ingester.UseFlatbufferSearch && c.StorageConfig.Trace.Block.Version == v2.VersionString { - warnings = append(warnings, warnIngesterSearchWillNotWork) + if c.StorageConfig.Trace.Block.Version != "v2" && c.StorageConfig.Trace.Block.IndexPageSizeBytes != storage.DefaultIndexPageSizeBytes { + warnings = append(warnings, newV2Warning("v2_index_page_size_bytes")) + } + + if c.StorageConfig.Trace.Block.Version != "v2" && c.Compactor.Compactor.ChunkSizeBytes != tempodb.DefaultChunkSizeBytes { + warnings = append(warnings, newV2Warning("v2_in_buffer_bytes")) + } + + if c.StorageConfig.Trace.Block.Version != "v2" && c.Compactor.Compactor.FlushSizeBytes != tempodb.DefaultFlushSizeBytes { + warnings = append(warnings, newV2Warning("v2_out_buffer_bytes")) + } + + if c.StorageConfig.Trace.Block.Version != "v2" && c.Compactor.Compactor.IteratorBufferSize != tempodb.DefaultIteratorBufferSize { + warnings = append(warnings, newV2Warning("v2_prefetch_traces_count")) } return warnings @@ -235,12 +245,11 @@ var ( warnStorageTraceBackendLocal = ConfigWarning{ Message: "Local backend will not correctly retrieve traces with a distributed deployment unless all components have access to the same disk. You should probably be using object storage as a backend.", } - warnFlatBuffersNotNecessary = ConfigWarning{ - Message: "Flatbuffers enabled with a block type that supports search.", - Explain: "The configured block type supports local search in the ingester. Flatbuffers are not necessary and will consume extra resources.", - } - warnIngesterSearchWillNotWork = ConfigWarning{ - Message: "Flatbuffers disabled with a block type that does not support search", - Explain: "Flatbuffers are disabled but the configured block type does not support ingester search. This can be ignored if only trace by id lookup is desired.", - } ) + +func newV2Warning(setting string) ConfigWarning { + return ConfigWarning{ + Message: "c.StorageConfig.Trace.Block.Version != \"v2\" but " + setting + " is set", + Explain: "This setting is only used in v2 blocks", + } +} diff --git a/cmd/tempo/app/config_test.go b/cmd/tempo/app/config_test.go index 07acf27e97c..97fedf062d3 100644 --- a/cmd/tempo/app/config_test.go +++ b/cmd/tempo/app/config_test.go @@ -8,7 +8,8 @@ import ( "github.com/grafana/tempo/modules/storage" "github.com/grafana/tempo/tempodb" "github.com/grafana/tempo/tempodb/encoding/common" - + v2 "github.com/grafana/tempo/tempodb/encoding/v2" + "github.com/grafana/tempo/tempodb/encoding/vparquet" "github.com/stretchr/testify/assert" ) @@ -31,7 +32,9 @@ func TestConfig_CheckConfig(t *testing.T) { Trace: tempodb.Config{ Backend: "s3", BlocklistPoll: time.Minute, - Block: &common.BlockConfig{}, + Block: &common.BlockConfig{ + Version: "v2", + }, }, }, Distributor: distributor.Config{ @@ -55,7 +58,9 @@ func TestConfig_CheckConfig(t *testing.T) { cfg.StorageConfig.Trace = tempodb.Config{ Backend: "local", BlocklistPollConcurrency: 1, - Block: &common.BlockConfig{}, + Block: &common.BlockConfig{ + Version: "v2", + }, } cfg.Target = "something" return cfg @@ -63,22 +68,38 @@ func TestConfig_CheckConfig(t *testing.T) { expect: []ConfigWarning{warnStorageTraceBackendLocal}, }, { - name: "warn ingester search", + name: "warnings for v2 settings when they drift from default", config: func() *Config { cfg := newDefaultConfig() - cfg.StorageConfig.Trace.Block.Version = "v2" + cfg.StorageConfig.Trace.Block.Version = vparquet.VersionString + cfg.StorageConfig.Trace.Block.IndexDownsampleBytes = 1 + cfg.StorageConfig.Trace.Block.IndexPageSizeBytes = 1 + cfg.Compactor.Compactor.ChunkSizeBytes = 1 + cfg.Compactor.Compactor.FlushSizeBytes = 1 + cfg.Compactor.Compactor.IteratorBufferSize = 1 return cfg }(), - expect: []ConfigWarning{warnIngesterSearchWillNotWork}, + expect: []ConfigWarning{ + newV2Warning("v2_index_downsample_bytes"), + newV2Warning("v2_index_page_size_bytes"), + newV2Warning("v2_in_buffer_bytes"), + newV2Warning("v2_out_buffer_bytes"), + newV2Warning("v2_prefetch_traces_count"), + }, }, { - name: "warn flatbuffers not necessary", + name: "no warnings for v2 settings when they drift from default and v2 is the block version", config: func() *Config { cfg := newDefaultConfig() - cfg.Ingester.UseFlatbufferSearch = true + cfg.StorageConfig.Trace.Block.Version = v2.VersionString + cfg.StorageConfig.Trace.Block.IndexDownsampleBytes = 1 + cfg.StorageConfig.Trace.Block.IndexPageSizeBytes = 1 + cfg.Compactor.Compactor.ChunkSizeBytes = 1 + cfg.Compactor.Compactor.FlushSizeBytes = 1 + cfg.Compactor.Compactor.IteratorBufferSize = 1 return cfg }(), - expect: []ConfigWarning{warnFlatBuffersNotNecessary}, + expect: nil, }, } diff --git a/cmd/tempo/app/modules.go b/cmd/tempo/app/modules.go index 8760f780a06..f4919c0914d 100644 --- a/cmd/tempo/app/modules.go +++ b/cmd/tempo/app/modules.go @@ -38,6 +38,7 @@ import ( "github.com/grafana/tempo/tempodb/backend/gcs" "github.com/grafana/tempo/tempodb/backend/local" "github.com/grafana/tempo/tempodb/backend/s3" + v2 "github.com/grafana/tempo/tempodb/encoding/v2" ) // The various modules that make up tempo. @@ -145,6 +146,9 @@ func (t *App) initDistributor() (services.Service, error) { } func (t *App) initIngester() (services.Service, error) { + // always use flatbuffer search if we're using the v2 blocks. todo: in 2.1 remove flatbuffer search altogether + t.cfg.Ingester.UseFlatbufferSearch = (t.cfg.StorageConfig.Trace.Block.Version == v2.VersionString) + t.cfg.Ingester.LifecyclerConfig.ListenPort = t.cfg.Server.GRPCListenPort ingester, err := ingester.New(t.cfg.Ingester, t.store, t.overrides, prometheus.DefaultRegisterer) if err != nil { diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index 19022a144fc..1b57af55250 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -213,22 +213,16 @@ ingester: [flush_check_period: ] # maximum size of a block before cutting it - # (default: 1073741824 = 1GB) + # (default: 524288000 = 500MB) [max_block_bytes: ] # maximum length of time before cutting a block - # (default: 1h) + # (default: 30m) [max_block_duration: ] # duration to keep blocks in the ingester after they have been flushed # (default: 15m) [ complete_block_timeout: ] - - # If true then flatbuffer search metadata files are created and used in the ingester for search, - # search tags and search tag values. If false then the blocks themselves are used for search in the ingesters. - # Warning: v2 blocks do not support ingester search without this enabled. - # (default: false) - [ use_flatbuffer_search: ] ``` ## Metrics-generator @@ -361,10 +355,6 @@ query_frontend: # (default: 2) [max_retries: ] - # The number of shards to split a trace by id query into. - # (default: 20) - [query_shards: ] - # number of block queries that are tolerated to error before considering the entire query as failed # numbers greater than 0 make possible for a read to return partial results # (default: 0) @@ -373,11 +363,11 @@ query_frontend: search: # The number of concurrent jobs to execute when searching the backend. - # (default: 50) + # (default: 1000) [concurrent_jobs: ] # The target number of bytes for each job to handle when performing a backend search. - # (default: 10485760) + # (default: 104857600) [target_bytes_per_job: ] # Limit used for search requests if none is set by the caller @@ -392,7 +382,7 @@ query_frontend: # The maximum allowed time range for a search. # 0 disables this limit. - # (default: 1h1m0s) + # (default: 168h) [max_duration: ] # query_backend_after and query_ingesters_until together control where the query-frontend searches for traces. @@ -403,11 +393,14 @@ query_frontend: # (default: 15m) [query_backend_after: ] - # (default: 1h) + # (default: 30m) [query_ingesters_until: ] # Trace by ID lookup configuration trace_by_id: + # The number of shards to split a trace by id query into. + # (default: 50) + [query_shards: ] # If set to a non-zero value, a second request will be issued at the provided duration. # Recommended to be set to p99 of search requests to reduce long-tail latency. @@ -428,14 +421,11 @@ The Querier is responsible for querying the backends/cache for the traceID. # querier config block querier: - # Timeout for trace lookup requests - [query_timeout: | default = 10s] - # The query frontend turns both trace by id (/api/traces/) and search (/api/search?) requests # into subqueries that are then pulled and serviced by the queriers. # This value controls the overall number of simultaneous subqueries that the querier will service at once. It does # not distinguish between the types of queries. - [max_concurrent_queries: | default = 5] + [max_concurrent_queries: | default = 20] # The query frontend sents sharded requests to ingesters and querier (/api/traces/) # By default, all healthy ingesters are queried for the trace id. @@ -444,6 +434,10 @@ querier: # If this parameter is set, the number of 404s could increase during rollout or scaling of ingesters. [query_relevant_ingesters: | default = false] + trace_by_id: + # Timeout for trace lookup requests + [query_timeout: | default = 10s] + search: # Timeout for search requests [query_timeout: | default = 30s] @@ -459,7 +453,7 @@ querier: # number of subqueries. In the default case of 2 the querier will process up to 2 search requests subqueries before starting # to reach out to search_external_endpoints. # Setting this to 0 will disable this feature and the querier will proxy all search subqueries to search_external_endpoints. - [prefer_self: | default = 2 ] + [prefer_self: | default = 10 ] # If set to a non-zero value a second request will be issued at the provided duration. Recommended to # be set to p99 of external search requests to reduce long tail latency. @@ -514,12 +508,6 @@ compactor: # Optional. Blocks in this time window will be compacted together. Default is 1h. [compaction_window: ] - # Optional. Amount of data to buffer from input blocks. Default is 5 MiB. - [chunk_size_bytes: ] - - # Optional. Flush data to backend when buffer is this large. Default is 30 MB. - [flush_size_bytes: ] - # Optional. Maximum number of traces in a compacted block. Default is 6 million. # WARNING: Deprecated. Use max_block_bytes instead. [max_compaction_objects: ] @@ -530,9 +518,6 @@ compactor: # Optional. Number of tenants to process in parallel during retention. Default is 10. [retention_concurrency: ] - # Optional. Number of traces to buffer in memory during compaction. Increasing may improve performance but will also increase memory usage. Default is 1000. - [iterator_buffer_size: ] - # Optional. The maximum amount of time to spend compacting a single tenant before moving to the next. Default is 5m. [max_time_per_tenant: ] @@ -540,6 +525,14 @@ compactor: # Note: The default will be used if the value is set to 0. [compaction_cycle: ] + # Optional. Amount of data to buffer from input blocks. Default is 5 MiB. + [v2_in_buffer_bytes: ] + + # Optional. Flush data to backend when buffer is this large. Default is 30 MB. + [v2_out_buffer_bytes: ] + + # Optional. Number of traces to buffer in memory during compaction. Increasing may improve performance but will also increase memory usage. Default is 1000. + [v2_prefetch_traces_count: ] ``` ## Storage @@ -786,12 +779,12 @@ storage: # Size of read buffers used when performing search on a vparquet block. This value times the read_buffer_count # is the total amount of bytes used for buffering when performing search on a parquet block. - # Default: 4194304 + # Default: 1048576 [read_buffer_size_bytes: ] # Number of read buffers used when performing search on a vparquet block. This value times the read_buffer_size_bytes # is the total amount of bytes used for buffering when performing search on a parquet block. - # Default: 8 + # Default: 32 [read_buffer_count: ] # Granular cache control settings for parquet metadata objects @@ -923,11 +916,11 @@ storage: # the worker pool is used primarily when finding traces by id, but is also used by other pool: - # total number of workers pulling jobs from the queue (default: 50) + # total number of workers pulling jobs from the queue (default: 400) [max_workers: ] # length of job queue. imporatant for querier as it queues a job for every block it has to search - # (default: 10000) + # (default: 20000) [queue_depth: ] # Configuration block for the Write Ahead Log (WAL) @@ -939,7 +932,7 @@ storage: # wal encoding/compression. # options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 - [encoding: | default = snappy] + [v2_encoding: | default = snappy] # Defines the search data encoding/compression protocol. # Options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 @@ -957,6 +950,8 @@ storage: # block configuration block: + # block format version. options: v2, vParquet + [version: | default = vParquet] # bloom filter false positive rate. lower values create larger filters but fewer false positives [bloom_filter_false_positive: | default = 0.01] @@ -965,13 +960,10 @@ storage: [bloom_filter_shard_size_bytes: | default = 100KiB] # number of bytes per index record - [index_downsample_bytes: | default = 1MiB] - - # block format version. options: v2, vParquet - [version: | default = v2] + [v2_index_downsample_bytes: | default = 1MiB] # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 - [encoding: | default = zstd] + [v2_encoding: | default = zstd] # search data encoding/compression. same options as block encoding. [search_encoding: | default = snappy] @@ -982,7 +974,7 @@ storage: # an estimate of the number of bytes per row group when cutting Parquet blocks. lower values will # create larger footers but will be harder to shard when searching. It is difficult to calculate # this field directly and it may vary based on workload. This is roughly a lower bound. - [row_group_size_bytes: | default = 100MB] + [parquet_row_group_size_bytes: | default = 100MB] ``` ## Memberlist diff --git a/docs/sources/configuration/compression.md b/docs/sources/configuration/compression.md index 3a188cabe46..cc287e2878e 100644 --- a/docs/sources/configuration/compression.md +++ b/docs/sources/configuration/compression.md @@ -16,7 +16,7 @@ Compression is configured under storage like so: storage: trace: block: - encoding: zstd + v2_encoding: zstd ``` The following options are supported: @@ -45,5 +45,5 @@ but only `snappy` has been tested at scale. storage: trace: wal: - encoding: snappy + v2_encoding: snappy ``` \ No newline at end of file diff --git a/docs/sources/configuration/manifest.md b/docs/sources/configuration/manifest.md index ec638eeca2f..037e0cb906a 100644 --- a/docs/sources/configuration/manifest.md +++ b/docs/sources/configuration/manifest.md @@ -25,7 +25,7 @@ http_api_prefix: "" server: http_listen_network: tcp http_listen_address: "" - http_listen_port: 80 + http_listen_port: 3100 http_listen_conn_limit: 0 grpc_listen_network: tcp grpc_listen_address: "" @@ -99,7 +99,7 @@ distributor: mirror_timeout: 2s heartbeat_period: 5s heartbeat_timeout: 5m0s - instance_id: hostname + instance_id: joe instance_interface_names: - eth0 - en0 @@ -164,12 +164,13 @@ metrics_generator_client: querier: search: query_timeout: 30s - prefer_self: 2 + prefer_self: 10 external_endpoints: [] external_hedge_requests_at: 8s external_hedge_requests_up_to: 2 - query_timeout: 10s - max_concurrent_queries: 5 + trace_by_id: + query_timeout: 10s + max_concurrent_queries: 20 frontend_worker: frontend_address: 127.0.0.1:9095 dns_lookup_duration: 10s @@ -197,19 +198,19 @@ querier: tls_min_version: "" query_relevant_ingesters: false query_frontend: - max_outstanding_per_tenant: 100 + max_outstanding_per_tenant: 2000 querier_forget_delay: 0s max_retries: 2 search: - concurrent_jobs: 50 - target_bytes_per_job: 10485760 + concurrent_jobs: 1000 + target_bytes_per_job: 104857600 default_result_limit: 20 max_result_limit: 0 - max_duration: 1h1m0s + max_duration: 168h0m0s query_backend_after: 15m0s - query_ingesters_until: 1h0m0s + query_ingesters_until: 30m0s trace_by_id: - query_shards: 20 + query_shards: 50 hedge_requests_at: 2s hedge_requests_up_to: 2 compactor: @@ -248,7 +249,7 @@ compactor: heartbeat_timeout: 1m0s wait_stability_min_duration: 1m0s wait_stability_max_duration: 5m0s - instance_id: hostname + instance_id: joe instance_interface_names: - eth0 - en0 @@ -256,15 +257,15 @@ compactor: instance_addr: "" wait_active_instance_timeout: 10m0s compaction: - chunk_size_bytes: 5242880 - flush_size_bytes: 20971520 + v2_in_buffer_bytes: 5242880 + v2_out_buffer_bytes: 20971520 + v2_prefetch_traces_count: 1000 compaction_window: 1h0m0s max_compaction_objects: 6000000 max_block_bytes: 107374182400 block_retention: 336h0m0s compacted_block_retention: 1h0m0s retention_concurrency: 10 - iterator_buffer_size: 1000 max_time_per_tenant: 5m0s compaction_cycle: 30s override_ring_key: compactor @@ -312,7 +313,21 @@ ingester: join_after: 0s min_ready_duration: 15s interface_names: - - en0 + - wlp2s0 + - br-39d728775b03 + - br-d5846bf66182 + - br-ea48fef4186e + - br-f163873defd4 + - br-24f5062c6edd + - br-3b836e91bc36 + - br-9cef180f0356 + - br-a5544df3e712 + - br-14ab1fbc2f0e + - br-16536cce4aa3 + - docker0 + - br-721c7a5d3933 + - br-dd28551f2dbd + - br-d3d1776850a0 final_sleep: 0s tokens_file_path: "" availability_zone: "" @@ -320,16 +335,15 @@ ingester: readiness_check_ring_health: true address: 127.0.0.1 port: 0 - id: hostname - concurrent_flushes: 16 + id: joe + concurrent_flushes: 4 flush_check_period: 10s flush_op_timeout: 5m0s trace_idle_period: 10s - max_block_duration: 1h0m0s - max_block_bytes: 1073741824 + max_block_duration: 30m0s + max_block_bytes: 524288000 complete_block_timeout: 15m0s override_ring_key: ring - use_flatbuffer_search: false metrics_generator: ring: kvstore: @@ -364,7 +378,7 @@ metrics_generator: mirror_timeout: 2s heartbeat_period: 5s heartbeat_timeout: 1m0s - instance_id: hostname + instance_id: joe instance_interface_names: - eth0 - en0 @@ -426,31 +440,31 @@ metrics_generator: storage: trace: pool: - max_workers: 50 - queue_depth: 10000 + max_workers: 400 + queue_depth: 20000 wal: path: /tmp/tempo/wal completedfilepath: /tmp/tempo/wal/completed blocksfilepath: /tmp/tempo/wal/blocks - encoding: snappy + v2_encoding: snappy search_encoding: none - version: v2 ingestion_time_range_slack: 2m0s + version: vParquet block: - index_downsample_bytes: 1048576 - index_page_size_bytes: 256000 bloom_filter_false_positive: 0.01 bloom_filter_shard_size_bytes: 102400 version: vParquet - encoding: zstd search_encoding: snappy search_page_size_bytes: 1048576 - row_group_size_bytes: 100000000 + v2_index_downsample_bytes: 1048576 + v2_index_page_size_bytes: 256000 + v2_encoding: zstd + parquet_row_group_size_bytes: 100000000 search: chunk_size_bytes: 1000000 prefetch_trace_count: 1000 - read_buffer_count: 8 - read_buffer_size_bytes: 4194304 + read_buffer_count: 32 + read_buffer_size_bytes: 1048576 cache_control: footer: false column_index: false diff --git a/docs/sources/setup/linux.md b/docs/sources/setup/linux.md index f7a9a735d3d..bb7f5868bdf 100644 --- a/docs/sources/setup/linux.md +++ b/docs/sources/setup/linux.md @@ -127,11 +127,11 @@ The [extended instructions for installing the TNS application]({{< relref "../li secret_key: # TODO - Add S3 secret key block: bloom_filter_false_positive: .05 # bloom filter false positive rate. lower values create larger filters but fewer false positives - index_downsample_bytes: 1000 # number of bytes per index record - encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 + v2_index_downsample_bytes: 1000 # number of bytes per index record + v2_encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 wal: path: /tmp/tempo/wal # where to store the the wal locally - encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 + v2_encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 local: path: /tmp/tempo/blocks pool: diff --git a/example/docker-compose/azure/tempo-azure.yaml b/example/docker-compose/azure/tempo-azure.yaml index df19ef8ad53..af35625f961 100644 --- a/example/docker-compose/azure/tempo-azure.yaml +++ b/example/docker-compose/azure/tempo-azure.yaml @@ -47,11 +47,11 @@ storage: backend: azure # backend configuration to use block: bloom_filter_false_positive: .05 # bloom filter false positive rate. lower values create larger filters but fewer false positives - index_downsample_bytes: 1000 # number of bytes per index record - encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 + v2_index_downsample_bytes: 1000 # number of bytes per index record + v2_encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 wal: path: /tmp/tempo/wal # where to store the the wal locally - encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 + v2_encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 azure: container_name: tempo # how to store data in azure endpoint_suffix: azurite:10000 diff --git a/example/docker-compose/distributed/tempo-distributed.yaml b/example/docker-compose/distributed/tempo-distributed.yaml index fe62ff797ba..7aca66da0c6 100644 --- a/example/docker-compose/distributed/tempo-distributed.yaml +++ b/example/docker-compose/distributed/tempo-distributed.yaml @@ -41,6 +41,7 @@ compactor: compacted_block_retention: 10m query_frontend: + trace_by_id: query_shards: 2 querier: @@ -69,7 +70,7 @@ storage: insecure: true wal: path: /tmp/tempo/wal # where to store the the wal locally - encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 + v2_encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 local: path: /tmp/tempo/blocks pool: diff --git a/example/docker-compose/gcs/tempo-gcs.yaml b/example/docker-compose/gcs/tempo-gcs.yaml index da23cefa13e..169691d7030 100644 --- a/example/docker-compose/gcs/tempo-gcs.yaml +++ b/example/docker-compose/gcs/tempo-gcs.yaml @@ -30,7 +30,7 @@ compactor: max_block_bytes: 100_000_000 # maximum size of compacted blocks block_retention: 1h compacted_block_retention: 10m - flush_size_bytes: 5242880 + v2_out_buffer_bytes: 5242880 metrics_generator: registry: @@ -48,11 +48,11 @@ storage: backend: gcs # backend configuration to use block: bloom_filter_false_positive: .05 # bloom filter false positive rate. lower values create larger filters but fewer false positives - index_downsample_bytes: 1000 # number of bytes per index record - encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 + v2_index_downsample_bytes: 1000 # number of bytes per index record + v2_encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 wal: path: /tmp/tempo/wal # where to store the the wal locally - encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 + v2_encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 gcs: bucket_name: tempo endpoint: https://gcs:4443/storage/v1/ diff --git a/example/docker-compose/local/tempo-local.yaml b/example/docker-compose/local/tempo-local.yaml index 21440817f7f..46c9052864a 100644 --- a/example/docker-compose/local/tempo-local.yaml +++ b/example/docker-compose/local/tempo-local.yaml @@ -47,11 +47,11 @@ storage: backend: local # backend configuration to use block: bloom_filter_false_positive: .05 # bloom filter false positive rate. lower values create larger filters but fewer false positives - index_downsample_bytes: 1000 # number of bytes per index record - encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 + v2_index_downsample_bytes: 1000 # number of bytes per index record + v2_encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 wal: path: /tmp/tempo/wal # where to store the the wal locally - encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 + v2_encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 local: path: /tmp/tempo/blocks pool: diff --git a/example/docker-compose/s3/tempo-s3.yaml b/example/docker-compose/s3/tempo-s3.yaml index 02c741b106a..9dc12c7df9e 100644 --- a/example/docker-compose/s3/tempo-s3.yaml +++ b/example/docker-compose/s3/tempo-s3.yaml @@ -30,7 +30,7 @@ compactor: max_block_bytes: 100_000_000 # maximum size of compacted blocks block_retention: 1h compacted_block_retention: 10m - flush_size_bytes: 5242880 + v2_out_buffer_bytes: 5242880 metrics_generator: registry: @@ -48,11 +48,11 @@ storage: backend: s3 # backend configuration to use block: bloom_filter_false_positive: .05 # bloom filter false positive rate. lower values create larger filters but fewer false positives - index_downsample_bytes: 1000 # number of bytes per index record - encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 + v2_index_downsample_bytes: 1000 # number of bytes per index record + v2_encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 wal: path: /tmp/tempo/wal # where to store the the wal locally - encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 + v2_encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 s3: bucket: tempo # how to store data in s3 endpoint: minio:9000 diff --git a/example/docker-compose/scalable-single-binary/tempo-scalable-single-binary.yaml b/example/docker-compose/scalable-single-binary/tempo-scalable-single-binary.yaml index d5b4d5519b2..9757cc4ddde 100644 --- a/example/docker-compose/scalable-single-binary/tempo-scalable-single-binary.yaml +++ b/example/docker-compose/scalable-single-binary/tempo-scalable-single-binary.yaml @@ -64,11 +64,11 @@ storage: backend: s3 # backend configuration to use block: bloom_filter_false_positive: .05 # bloom filter false positive rate. lower values create larger filters but fewer false positives - index_downsample_bytes: 1000 # number of bytes per index record - encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 + v2_index_downsample_bytes: 1000 # number of bytes per index record + v2_encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 wal: path: /tmp/tempo/wal # where to store the the wal locally - encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 + v2_encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 s3: bucket: tempo # how to store data in s3 endpoint: minio:9000 diff --git a/example/docker-compose/tempo-search/tempo.yaml b/example/docker-compose/tempo-search/tempo.yaml index 1d856edf7e4..a1b1a2379ac 100644 --- a/example/docker-compose/tempo-search/tempo.yaml +++ b/example/docker-compose/tempo-search/tempo.yaml @@ -36,12 +36,12 @@ storage: backend: local # backend configuration to use block: bloom_filter_false_positive: .05 # bloom filter false positive rate. lower values create larger filters but fewer false positives - index_downsample_bytes: 1000 # number of bytes per index record - encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 + v2_index_downsample_bytes: 1000 # number of bytes per index record + v2_encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 version: vParquet wal: path: /tmp/tempo/wal # where to store the the wal locally - encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 + v2_encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 local: path: /tmp/tempo/blocks pool: diff --git a/modules/compactor/config.go b/modules/compactor/config.go index 2ac72b76b08..6a3fd525432 100644 --- a/modules/compactor/config.go +++ b/modules/compactor/config.go @@ -22,7 +22,7 @@ type Config struct { // RegisterFlagsAndApplyDefaults registers the flags. func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) { cfg.Compactor = tempodb.CompactorConfig{ - ChunkSizeBytes: 5 * 1024 * 1024, // 5 MiB + ChunkSizeBytes: tempodb.DefaultChunkSizeBytes, // 5 MiB FlushSizeBytes: tempodb.DefaultFlushSizeBytes, CompactedBlockRetention: time.Hour, RetentionConcurrency: tempodb.DefaultRetentionConcurrency, diff --git a/modules/frontend/config.go b/modules/frontend/config.go index 038de84d754..1faee69983a 100644 --- a/modules/frontend/config.go +++ b/modules/frontend/config.go @@ -19,14 +19,11 @@ var ( ) type Config struct { - Config v1.Config `yaml:",inline"` - MaxRetries int `yaml:"max_retries,omitempty"` - TolerateFailedBlocks int `yaml:"tolerate_failed_blocks,omitempty"` - Search SearchConfig `yaml:"search"` - // Deprecated: Use TraceByID.QueryShards instead. - // TODO: Remove QueryShards with Tempo v2 - QueryShards int `yaml:"query_shards,omitempty"` - TraceByID TraceByIDConfig `yaml:"trace_by_id"` + Config v1.Config `yaml:",inline"` + MaxRetries int `yaml:"max_retries,omitempty"` + TolerateFailedBlocks int `yaml:"tolerate_failed_blocks,omitempty"` + Search SearchConfig `yaml:"search"` + TraceByID TraceByIDConfig `yaml:"trace_by_id"` } type SearchConfig struct { @@ -44,22 +41,22 @@ type HedgingConfig struct { } func (cfg *Config) RegisterFlagsAndApplyDefaults(string, *flag.FlagSet) { - cfg.Config.MaxOutstandingPerTenant = 100 + cfg.Config.MaxOutstandingPerTenant = 2000 cfg.MaxRetries = 2 cfg.TolerateFailedBlocks = 0 cfg.Search = SearchConfig{ Sharder: SearchSharderConfig{ QueryBackendAfter: 15 * time.Minute, - QueryIngestersUntil: time.Hour, + QueryIngestersUntil: 30 * time.Minute, DefaultLimit: 20, MaxLimit: 0, - MaxDuration: 61 * time.Minute, + MaxDuration: 168 * time.Hour, // 1 week ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, }, } cfg.TraceByID = TraceByIDConfig{ - QueryShards: 20, + QueryShards: 50, Hedging: HedgingConfig{ HedgeRequestsAt: 2 * time.Second, HedgeRequestsUpTo: 2, diff --git a/modules/frontend/frontend.go b/modules/frontend/frontend.go index b7ea4088667..906996a175f 100644 --- a/modules/frontend/frontend.go +++ b/modules/frontend/frontend.go @@ -42,11 +42,6 @@ type QueryFrontend struct { func New(cfg Config, next http.RoundTripper, o *overrides.Overrides, store storage.Store, logger log.Logger, registerer prometheus.Registerer) (*QueryFrontend, error) { level.Info(logger).Log("msg", "creating middleware in query frontend") - if cfg.QueryShards != 0 { - cfg.TraceByID.QueryShards = cfg.QueryShards - level.Warn(logger).Log("msg", "query_shards is deprecated, use trace_by_id.query_shards instead") - } - if cfg.TraceByID.QueryShards < minQueryShards || cfg.TraceByID.QueryShards > maxQueryShards { return nil, fmt.Errorf("frontend query shards should be between %d and %d (both inclusive)", minQueryShards, maxQueryShards) } diff --git a/modules/frontend/frontend_test.go b/modules/frontend/frontend_test.go index e0247d4b750..66d70d87947 100644 --- a/modules/frontend/frontend_test.go +++ b/modules/frontend/frontend_test.go @@ -24,7 +24,10 @@ func (s *mockNextTripperware) RoundTrip(_ *http.Request) (*http.Response, error) func TestFrontendRoundTripsSearch(t *testing.T) { next := &mockNextTripperware{} - f, err := New(Config{QueryShards: minQueryShards, + f, err := New(Config{ + TraceByID: TraceByIDConfig{ + QueryShards: minQueryShards, + }, Search: SearchConfig{ Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, @@ -43,7 +46,10 @@ func TestFrontendRoundTripsSearch(t *testing.T) { } func TestFrontendBadConfigFails(t *testing.T) { - f, err := New(Config{QueryShards: minQueryShards - 1, + f, err := New(Config{ + TraceByID: TraceByIDConfig{ + QueryShards: minQueryShards - 1, + }, Search: SearchConfig{ Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, @@ -54,7 +60,10 @@ func TestFrontendBadConfigFails(t *testing.T) { assert.EqualError(t, err, "frontend query shards should be between 2 and 256 (both inclusive)") assert.Nil(t, f) - f, err = New(Config{QueryShards: maxQueryShards + 1, + f, err = New(Config{ + TraceByID: TraceByIDConfig{ + QueryShards: maxQueryShards + 1, + }, Search: SearchConfig{ Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, @@ -65,7 +74,10 @@ func TestFrontendBadConfigFails(t *testing.T) { assert.EqualError(t, err, "frontend query shards should be between 2 and 256 (both inclusive)") assert.Nil(t, f) - f, err = New(Config{QueryShards: maxQueryShards, + f, err = New(Config{ + TraceByID: TraceByIDConfig{ + QueryShards: maxQueryShards, + }, Search: SearchConfig{ Sharder: SearchSharderConfig{ ConcurrentRequests: 0, @@ -76,7 +88,10 @@ func TestFrontendBadConfigFails(t *testing.T) { assert.EqualError(t, err, "frontend search concurrent requests should be greater than 0") assert.Nil(t, f) - f, err = New(Config{QueryShards: maxQueryShards, + f, err = New(Config{ + TraceByID: TraceByIDConfig{ + QueryShards: maxQueryShards, + }, Search: SearchConfig{ Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, @@ -87,7 +102,10 @@ func TestFrontendBadConfigFails(t *testing.T) { assert.EqualError(t, err, "frontend search target bytes per request should be greater than 0") assert.Nil(t, f) - f, err = New(Config{QueryShards: maxQueryShards, + f, err = New(Config{ + TraceByID: TraceByIDConfig{ + QueryShards: maxQueryShards, + }, Search: SearchConfig{ Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, diff --git a/modules/frontend/searchsharding.go b/modules/frontend/searchsharding.go index a6d43800463..a2b13da8178 100644 --- a/modules/frontend/searchsharding.go +++ b/modules/frontend/searchsharding.go @@ -25,8 +25,8 @@ import ( ) const ( - defaultTargetBytesPerRequest = 10 * 1024 * 1024 - defaultConcurrentRequests = 50 + defaultTargetBytesPerRequest = 100 * 1024 * 1024 + defaultConcurrentRequests = 1000 ) var ( diff --git a/modules/frontend/v1/frontend.go b/modules/frontend/v1/frontend.go index a9e9b28bbd9..25d21c94dd8 100644 --- a/modules/frontend/v1/frontend.go +++ b/modules/frontend/v1/frontend.go @@ -36,7 +36,7 @@ type Config struct { // RegisterFlags adds the flags required to config this to the given FlagSet. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - f.IntVar(&cfg.MaxOutstandingPerTenant, "querier.max-outstanding-requests-per-tenant", 100, "Maximum number of outstanding requests per tenant per frontend; requests beyond this error with HTTP 429.") + f.IntVar(&cfg.MaxOutstandingPerTenant, "querier.max-outstanding-requests-per-tenant", 2000, "Maximum number of outstanding requests per tenant per frontend; requests beyond this error with HTTP 429.") f.DurationVar(&cfg.QuerierForgetDelay, "query-frontend.querier-forget-delay", 0, "If a querier disconnects without sending notification about graceful shutdown, the query-frontend will keep the querier in the tenant's shard until the forget delay has passed. This feature is useful to reduce the blast radius when shuffle-sharding is enabled.") } diff --git a/modules/ingester/config.go b/modules/ingester/config.go index c2f4b4fa860..cd9bf3e2279 100644 --- a/modules/ingester/config.go +++ b/modules/ingester/config.go @@ -25,7 +25,7 @@ type Config struct { MaxBlockBytes uint64 `yaml:"max_block_bytes"` CompleteBlockTimeout time.Duration `yaml:"complete_block_timeout"` OverrideRingKey string `yaml:"override_ring_key"` - UseFlatbufferSearch bool `yaml:"use_flatbuffer_search"` + UseFlatbufferSearch bool `yaml:"-"` // no longer allow this to be set via config. this will be set based on the block version for now and then removed in 2.1 } // RegisterFlagsAndApplyDefaults registers the flags. @@ -36,14 +36,13 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) cfg.LifecyclerConfig.RingConfig.ReplicationFactor = 1 cfg.LifecyclerConfig.RingConfig.HeartbeatTimeout = 5 * time.Minute - cfg.ConcurrentFlushes = 16 + cfg.ConcurrentFlushes = 4 cfg.FlushCheckPeriod = 10 * time.Second cfg.FlushOpTimeout = 5 * time.Minute - cfg.UseFlatbufferSearch = false f.DurationVar(&cfg.MaxTraceIdle, prefix+".trace-idle-period", 10*time.Second, "Duration after which to consider a trace complete if no spans have been received") - f.DurationVar(&cfg.MaxBlockDuration, prefix+".max-block-duration", time.Hour, "Maximum duration which the head block can be appended to before cutting it.") - f.Uint64Var(&cfg.MaxBlockBytes, prefix+".max-block-bytes", 1024*1024*1024, "Maximum size of the head block before cutting it.") + f.DurationVar(&cfg.MaxBlockDuration, prefix+".max-block-duration", 30*time.Minute, "Maximum duration which the head block can be appended to before cutting it.") + f.Uint64Var(&cfg.MaxBlockBytes, prefix+".max-block-bytes", 500*1024*1024, "Maximum size of the head block before cutting it.") f.DurationVar(&cfg.CompleteBlockTimeout, prefix+".complete-block-timeout", 3*tempodb.DefaultBlocklistPoll, "Duration to keep blocks in the ingester after they have been flushed.") hostname, err := os.Hostname() diff --git a/modules/ingester/ingester_test.go b/modules/ingester/ingester_test.go index cee52930ebc..4f7bee1e210 100644 --- a/modules/ingester/ingester_test.go +++ b/modules/ingester/ingester_test.go @@ -12,7 +12,6 @@ import ( "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/kv/consul" "github.com/grafana/dskit/ring" - v2 "github.com/grafana/tempo/tempodb/encoding/v2" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/weaveworks/common/user" @@ -51,7 +50,7 @@ func TestPushQueryAllEncodings(t *testing.T) { tmpDir := t.TempDir() ctx := user.InjectOrgID(context.Background(), "test") - ingester, traces, traceIDs := defaultIngesterWithPush(t, tmpDir, v2.VersionString, push) + ingester, traces, traceIDs := defaultIngesterWithPush(t, tmpDir, push) // live trace search for pos, traceID := range traceIDs { @@ -84,7 +83,7 @@ func TestPushQueryAllEncodings(t *testing.T) { func TestFullTraceReturned(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "test") - ingester, _, _ := defaultIngester(t, t.TempDir(), v2.VersionString) + ingester, _, _ := defaultIngester(t, t.TempDir()) traceID := make([]byte, 16) _, err := rand.Read(traceID) @@ -129,7 +128,7 @@ func TestWal(t *testing.T) { tmpDir := t.TempDir() ctx := user.InjectOrgID(context.Background(), "test") - ingester, traces, traceIDs := defaultIngester(t, tmpDir, v2.VersionString) + ingester, traces, traceIDs := defaultIngester(t, tmpDir) for pos, traceID := range traceIDs { foundTrace, err := ingester.FindTraceByID(ctx, &tempopb.TraceByIDRequest{ @@ -146,7 +145,7 @@ func TestWal(t *testing.T) { } // create new ingester. this should replay wal! - ingester, _, _ = defaultIngester(t, tmpDir, v2.VersionString) + ingester, _, _ = defaultIngester(t, tmpDir) // should be able to find old traces that were replayed for i, traceID := range traceIDs { @@ -182,7 +181,7 @@ func TestWal(t *testing.T) { func TestWalDropsZeroLength(t *testing.T) { tmpDir := t.TempDir() - ingester, _, _ := defaultIngester(t, tmpDir, v2.VersionString) + ingester, _, _ := defaultIngester(t, tmpDir) // force cut all traces and wipe wal for _, instance := range ingester.instances { @@ -203,7 +202,7 @@ func TestWalDropsZeroLength(t *testing.T) { } // create new ingester. we should have no tenants b/c we all our wals should have been 0 length - ingester, _, _ = defaultIngesterWithPush(t, tmpDir, v2.VersionString, func(t testing.TB, i *Ingester, rs *v1.ResourceSpans, b []byte) {}) + ingester, _, _ = defaultIngesterWithPush(t, tmpDir, func(t testing.TB, i *Ingester, rs *v1.ResourceSpans, b []byte) {}) require.Equal(t, 0, len(ingester.instances)) } @@ -212,7 +211,7 @@ func TestSearchWAL(t *testing.T) { require.NoError(t, err, "unexpected error getting tempdir") defer os.RemoveAll(tmpDir) - i := defaultIngesterModule(t, tmpDir, v2.VersionString) + i := defaultIngesterModule(t, tmpDir) inst, _ := i.getOrCreateInstance("test") require.NotNil(t, inst) @@ -254,7 +253,7 @@ func TestSearchWAL(t *testing.T) { time.Sleep(500 * time.Millisecond) // replay wal - i = defaultIngesterModule(t, tmpDir, v2.VersionString) + i = defaultIngesterModule(t, tmpDir) inst, ok := i.getInstanceByID("test") require.True(t, ok) @@ -317,7 +316,7 @@ func TestFlush(t *testing.T) { tmpDir := t.TempDir() ctx := user.InjectOrgID(context.Background(), "test") - ingester, traces, traceIDs := defaultIngester(t, tmpDir, v2.VersionString) + ingester, traces, traceIDs := defaultIngester(t, tmpDir) for pos, traceID := range traceIDs { foundTrace, err := ingester.FindTraceByID(ctx, &tempopb.TraceByIDRequest{ @@ -331,7 +330,7 @@ func TestFlush(t *testing.T) { require.NoError(t, ingester.stopping(nil)) // create new ingester. this should replay wal! - ingester, _, _ = defaultIngester(t, tmpDir, v2.VersionString) + ingester, _, _ = defaultIngester(t, tmpDir) // should be able to find old traces that were replayed for i, traceID := range traceIDs { @@ -345,7 +344,7 @@ func TestFlush(t *testing.T) { } } -func defaultIngesterModule(t testing.TB, tmpDir, walVersion string) *Ingester { +func defaultIngesterModule(t testing.TB, tmpDir string) *Ingester { ingesterConfig := defaultIngesterTestConfig() limits, err := overrides.NewOverrides(defaultLimitsTestConfig()) require.NoError(t, err, "unexpected error creating overrides") @@ -366,7 +365,6 @@ func defaultIngesterModule(t testing.TB, tmpDir, walVersion string) *Ingester { }, WAL: &wal.Config{ Filepath: tmpDir, - Version: walVersion, }, }, }, log.NewNopLogger()) @@ -382,12 +380,12 @@ func defaultIngesterModule(t testing.TB, tmpDir, walVersion string) *Ingester { return ingester } -func defaultIngester(t testing.TB, tmpDir string, walVersion string) (*Ingester, []*tempopb.Trace, [][]byte) { - return defaultIngesterWithPush(t, tmpDir, walVersion, pushBatchV2) +func defaultIngester(t testing.TB, tmpDir string) (*Ingester, []*tempopb.Trace, [][]byte) { + return defaultIngesterWithPush(t, tmpDir, pushBatchV2) } -func defaultIngesterWithPush(t testing.TB, tmpDir, walVersion string, push func(testing.TB, *Ingester, *v1.ResourceSpans, []byte)) (*Ingester, []*tempopb.Trace, [][]byte) { - ingester := defaultIngesterModule(t, tmpDir, walVersion) +func defaultIngesterWithPush(t testing.TB, tmpDir string, push func(testing.TB, *Ingester, *v1.ResourceSpans, []byte)) (*Ingester, []*tempopb.Trace, [][]byte) { + ingester := defaultIngesterModule(t, tmpDir) // make some fake traceIDs/requests traces := make([]*tempopb.Trace, 0) @@ -433,6 +431,7 @@ func defaultIngesterTestConfig() Config { cfg.LifecyclerConfig.Addr = "localhost" cfg.LifecyclerConfig.ID = "localhost" cfg.LifecyclerConfig.FinalSleep = 0 + cfg.UseFlatbufferSearch = false return cfg } diff --git a/modules/ingester/instance_search.go b/modules/ingester/instance_search.go index 7a1cb5f7df3..f46f937f15f 100644 --- a/modules/ingester/instance_search.go +++ b/modules/ingester/instance_search.go @@ -159,6 +159,9 @@ func (i *instance) searchWAL(ctx context.Context, req *tempopb.SearchRequest, p } searchWalBlock := func(b common.WALBlock) { + i.blocksMtx.Lock() + defer i.blocksMtx.Unlock() + blockID := b.BlockMeta().BlockID.String() span, ctx := opentracing.StartSpanFromContext(ctx, "instance.searchWALBlock", opentracing.Tags{ "blockID": blockID, @@ -187,6 +190,7 @@ func (i *instance) searchWAL(ctx context.Context, req *tempopb.SearchRequest, p sr.AddBlockInspected() sr.AddBytesInspected(resp.Metrics.InspectedBytes) + sr.AddTraceInspected(resp.Metrics.InspectedTraces) for _, r := range resp.Traces { sr.AddResult(ctx, r) } @@ -328,28 +332,41 @@ func (i *instance) SearchTags(ctx context.Context) (*tempopb.SearchTagsResponse, } } - // local blocks - if !distinctValues.Exceeded() { - i.blocksMtx.RLock() - defer i.blocksMtx.RUnlock() - for _, b := range i.completeBlocks { - _, ok := i.searchCompleteBlocks[b] - if ok { - // no need to search this block, we already did above - continue - } + search := func(s common.Searcher, dv *util.DistinctStringCollector) error { + if s == nil { + return nil + } + if dv.Exceeded() { + return nil + } + err = s.SearchTags(ctx, dv.Collect, common.DefaultSearchOptions()) + if err != nil && err != common.ErrUnsupported { + return fmt.Errorf("unexpected error searching tags: %w", err) + } - err = b.SearchTags(ctx, distinctValues.Collect, common.DefaultSearchOptions()) - if err == common.ErrUnsupported { - level.Warn(log.Logger).Log("msg", "block does not support tag search", "blockID", b.BlockMeta().BlockID) - continue - } - if err != nil { - return nil, fmt.Errorf("unexpected error searching tags (%s): %w", b.BlockMeta().BlockID, err) - } - if distinctValues.Exceeded() { - break - } + return nil + } + + i.blocksMtx.RLock() + defer i.blocksMtx.RUnlock() + + // search parquet wal/completing blocks/completed blocks + if err = search(i.headBlock, distinctValues); err != nil { + return nil, fmt.Errorf("unexpected error searching head block (%s): %w", i.headBlock.BlockMeta().BlockID, err) + } + for _, b := range i.completingBlocks { + if err = search(b, distinctValues); err != nil { + return nil, fmt.Errorf("unexpected error searching completing block (%s): %w", b.BlockMeta().BlockID, err) + } + } + for _, b := range i.completeBlocks { + _, ok := i.searchCompleteBlocks[b] + if ok { + // no need to search this block, we already did above + continue + } + if err = search(b, distinctValues); err != nil { + return nil, fmt.Errorf("unexpected error searching complete block (%s): %w", b.BlockMeta().BlockID, err) } } @@ -387,38 +404,41 @@ func (i *instance) SearchTagValues(ctx context.Context, tagName string) (*tempop return nil, err } - // wal + search blocks - if !distinctValues.Exceeded() { - err = i.visitSearchableBlocks(ctx, func(block search.SearchableBlock) error { - return block.TagValues(ctx, tagName, distinctValues.Collect) - }) - if err != nil { - return nil, err + search := func(s common.Searcher, dv *util.DistinctStringCollector) error { + if s == nil { + return nil + } + if dv.Exceeded() { + return nil } + err = s.SearchTagValues(ctx, tagName, dv.Collect, common.DefaultSearchOptions()) + if err != nil && err != common.ErrUnsupported { + return fmt.Errorf("unexpected error searching tag values (%s): %w", tagName, err) + } + + return nil } - // local blocks - if !distinctValues.Exceeded() { - i.blocksMtx.RLock() - defer i.blocksMtx.RUnlock() - for _, b := range i.completeBlocks { - _, ok := i.searchCompleteBlocks[b] - if ok { - // no need to search this block, we already did above - continue - } + i.blocksMtx.RLock() + defer i.blocksMtx.RUnlock() - err = b.SearchTagValues(ctx, tagName, distinctValues.Collect, common.DefaultSearchOptions()) - if err == common.ErrUnsupported { - level.Warn(log.Logger).Log("msg", "block does not support tag value search", "blockID", b.BlockMeta().BlockID) - continue - } - if err != nil { - return nil, fmt.Errorf("unexpected error searching tag values (%s): %w", b.BlockMeta().BlockID, err) - } - if distinctValues.Exceeded() { - break - } + // search parquet wal/completing blocks/completed blocks + if err = search(i.headBlock, distinctValues); err != nil { + return nil, fmt.Errorf("unexpected error searching head block (%s): %w", i.headBlock.BlockMeta().BlockID, err) + } + for _, b := range i.completingBlocks { + if err = search(b, distinctValues); err != nil { + return nil, fmt.Errorf("unexpected error searching completing block (%s): %w", b.BlockMeta().BlockID, err) + } + } + for _, b := range i.completeBlocks { + _, ok := i.searchCompleteBlocks[b] + if ok { + // no need to search this block, we already did above + continue + } + if err = search(b, distinctValues); err != nil { + return nil, fmt.Errorf("unexpected error searching complete block (%s): %w", b.BlockMeta().BlockID, err) } } diff --git a/modules/ingester/instance_search_test.go b/modules/ingester/instance_search_test.go index 526b0530f85..24ca789430d 100644 --- a/modules/ingester/instance_search_test.go +++ b/modules/ingester/instance_search_test.go @@ -11,8 +11,6 @@ import ( "time" "github.com/google/uuid" - v2 "github.com/grafana/tempo/tempodb/encoding/v2" - "github.com/grafana/tempo/tempodb/encoding/vparquet" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/atomic" @@ -23,78 +21,76 @@ import ( "github.com/grafana/tempo/pkg/model/trace" "github.com/grafana/tempo/pkg/tempofb" "github.com/grafana/tempo/pkg/tempopb" + v1 "github.com/grafana/tempo/pkg/tempopb/common/v1" "github.com/grafana/tempo/pkg/util" "github.com/grafana/tempo/pkg/util/test" "github.com/grafana/tempo/tempodb/search" ) func TestInstanceSearch(t *testing.T) { - for _, b := range []bool{true, false} { - t.Run(fmt.Sprintf("flatbufferSearch:%t", b), func(t *testing.T) { - i, ingester, tempDir := defaultInstanceWithFlatBufferSearch(t, b) + i, ingester, tempDir := defaultInstanceAndTmpDir(t) - var tagKey = "foo" - var tagValue = "bar" - ids, _ := writeTracesWithSearchData(t, i, tagKey, tagValue, false) + var tagKey = "foo" + var tagValue = "bar" + ids, _ := writeTracesWithSearchData(t, i, tagKey, tagValue, false) - var req = &tempopb.SearchRequest{ - Tags: map[string]string{}, - } - req.Tags[tagKey] = tagValue + var req = &tempopb.SearchRequest{ + Tags: map[string]string{}, + } + req.Tags[tagKey] = tagValue + req.Limit = uint32(len(ids)) + 1 - sr, err := i.Search(context.Background(), req) - assert.NoError(t, err) - assert.Len(t, sr.Traces, len(ids)) - // todo: test that returned results are in sorted time order, create order of id's beforehand - checkEqual(t, ids, sr) + sr, err := i.Search(context.Background(), req) + assert.NoError(t, err) + assert.Len(t, sr.Traces, len(ids)) + // todo: test that returned results are in sorted time order, create order of id's beforehand + checkEqual(t, ids, sr) - // Test after appending to WAL - err = i.CutCompleteTraces(0, true) - require.NoError(t, err) - assert.Equal(t, int(i.traceCount.Load()), len(i.traces)) + // Test after appending to WAL + err = i.CutCompleteTraces(0, true) + require.NoError(t, err) + assert.Equal(t, int(i.traceCount.Load()), len(i.traces)) - sr, err = i.Search(context.Background(), req) - assert.NoError(t, err) - assert.Len(t, sr.Traces, len(ids)) - checkEqual(t, ids, sr) + sr, err = i.Search(context.Background(), req) + assert.NoError(t, err) + assert.Len(t, sr.Traces, len(ids)) + checkEqual(t, ids, sr) - // Test after cutting new headBlock - blockID, err := i.CutBlockIfReady(0, 0, true) - require.NoError(t, err) - assert.NotEqual(t, blockID, uuid.Nil) + // Test after cutting new headblock + blockID, err := i.CutBlockIfReady(0, 0, true) + require.NoError(t, err) + assert.NotEqual(t, blockID, uuid.Nil) - sr, err = i.Search(context.Background(), req) - assert.NoError(t, err) - assert.Len(t, sr.Traces, len(ids)) - checkEqual(t, ids, sr) + sr, err = i.Search(context.Background(), req) + assert.NoError(t, err) + assert.Len(t, sr.Traces, len(ids)) + checkEqual(t, ids, sr) - // Test after completing a block - err = i.CompleteBlock(blockID) - require.NoError(t, err) + // Test after completing a block + err = i.CompleteBlock(blockID) + require.NoError(t, err) - sr, err = i.Search(context.Background(), req) - assert.NoError(t, err) - assert.Len(t, sr.Traces, len(ids)) - checkEqual(t, ids, sr) + sr, err = i.Search(context.Background(), req) + assert.NoError(t, err) + assert.Len(t, sr.Traces, len(ids)) + checkEqual(t, ids, sr) - err = ingester.stopping(nil) - require.NoError(t, err) + err = ingester.stopping(nil) + require.NoError(t, err) - // create new ingester. this should replay wal! - ingester, _, _ = defaultIngester(t, tempDir, v2.VersionString) + // create new ingester. this should replay wal! + ingester, _, _ = defaultIngester(t, tempDir) - i, ok := ingester.getInstanceByID("fake") - require.True(t, ok) + i, ok := ingester.getInstanceByID("fake") + require.True(t, ok) - sr, err = i.Search(context.Background(), req) - assert.NoError(t, err) - assert.Len(t, sr.Traces, len(ids)) - checkEqual(t, ids, sr) + sr, err = i.Search(context.Background(), req) + assert.NoError(t, err) + assert.Len(t, sr.Traces, len(ids)) + checkEqual(t, ids, sr) - err = ingester.stopping(nil) - require.NoError(t, err) - }) - } + err = ingester.stopping(nil) + require.NoError(t, err) } // TestInstanceSearchTraceQL is duplicate of TestInstanceSearch for now @@ -107,7 +103,7 @@ func TestInstanceSearchTraceQL(t *testing.T) { for _, query := range queries { t.Run(fmt.Sprintf("Query:%s", query), func(t *testing.T) { - i, ingester, tmpDir := defaultInstanceWithParquet(t) + i, ingester, tmpDir := defaultInstanceAndTmpDir(t) // pushTracesToInstance creates traces with: // `service.name = "test-service"` and duration >= 1s _, ids := pushTracesToInstance(t, i, 10) @@ -156,7 +152,7 @@ func TestInstanceSearchTraceQL(t *testing.T) { require.NoError(t, err) // create new ingester. this should replay wal! - ingester, _, _ = defaultIngester(t, tmpDir, vparquet.VersionString) + ingester, _, _ = defaultIngester(t, tmpDir) i, ok := ingester.getInstanceByID("fake") require.True(t, ok) @@ -188,40 +184,36 @@ func checkEqual(t *testing.T, ids [][]byte, sr *tempopb.SearchResponse) { } func TestInstanceSearchTags(t *testing.T) { - for _, b := range []bool{true, false} { - t.Run(fmt.Sprintf("flatbufferSearch:%t", b), func(t *testing.T) { - i, _, _ := defaultInstanceWithFlatBufferSearch(t, b) + i, _ := defaultInstance(t) - // add dummy search data - var tagKey = "foo" - var tagValue = "bar" + // add dummy search data + var tagKey = "foo" + var tagValue = "bar" - _, expectedTagValues := writeTracesWithSearchData(t, i, tagKey, tagValue, true) + _, expectedTagValues := writeTracesWithSearchData(t, i, tagKey, tagValue, true) - userCtx := user.InjectOrgID(context.Background(), "fake") - testSearchTagsAndValues(t, userCtx, i, tagKey, expectedTagValues) + userCtx := user.InjectOrgID(context.Background(), "fake") + testSearchTagsAndValues(t, userCtx, i, tagKey, expectedTagValues) - // Test after appending to WAL - err := i.CutCompleteTraces(0, true) - require.NoError(t, err) - assert.Equal(t, int(i.traceCount.Load()), len(i.traces)) + // Test after appending to WAL + err := i.CutCompleteTraces(0, true) + require.NoError(t, err) + assert.Equal(t, int(i.traceCount.Load()), len(i.traces)) - testSearchTagsAndValues(t, userCtx, i, tagKey, expectedTagValues) + testSearchTagsAndValues(t, userCtx, i, tagKey, expectedTagValues) - // Test after cutting new headblock - blockID, err := i.CutBlockIfReady(0, 0, true) - require.NoError(t, err) - assert.NotEqual(t, blockID, uuid.Nil) + // Test after cutting new headblock + blockID, err := i.CutBlockIfReady(0, 0, true) + require.NoError(t, err) + assert.NotEqual(t, blockID, uuid.Nil) - testSearchTagsAndValues(t, userCtx, i, tagKey, expectedTagValues) + testSearchTagsAndValues(t, userCtx, i, tagKey, expectedTagValues) - // Test after completing a block - err = i.CompleteBlock(blockID) - require.NoError(t, err) + // Test after completing a block + err = i.CompleteBlock(blockID) + require.NoError(t, err) - testSearchTagsAndValues(t, userCtx, i, tagKey, expectedTagValues) - }) - } + testSearchTagsAndValues(t, userCtx, i, tagKey, expectedTagValues) } // nolint:revive,unparam @@ -232,6 +224,7 @@ func testSearchTagsAndValues(t *testing.T, ctx context.Context, i *instance, tag require.NoError(t, err) sort.Strings(srv.TagValues) + sort.Strings(expectedTagValues) assert.Contains(t, sr.TagNames, tagName) assert.Equal(t, expectedTagValues, srv.TagValues) } @@ -239,31 +232,28 @@ func testSearchTagsAndValues(t *testing.T, ctx context.Context, i *instance, tag // TestInstanceSearchMaxBytesPerTagValuesQueryReturnsPartial confirms that SearchTagValues returns // partial results if the bytes of the found tag value exceeds the MaxBytesPerTagValuesQuery limit func TestInstanceSearchMaxBytesPerTagValuesQueryReturnsPartial(t *testing.T) { - for _, b := range []bool{true, false} { - t.Run(fmt.Sprintf("flatbufferSearch:%t", b), func(t *testing.T) { - limits, err := overrides.NewOverrides(overrides.Limits{ - MaxBytesPerTagValuesQuery: 10, - }) - assert.NoError(t, err, "unexpected error creating limits") - limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) + limits, err := overrides.NewOverrides(overrides.Limits{ + MaxBytesPerTagValuesQuery: 10, + }) + assert.NoError(t, err, "unexpected error creating limits") + limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) - tempDir := t.TempDir() + tempDir := t.TempDir() - ingester, _, _ := defaultIngester(t, tempDir, v2.VersionString) - i, err := newInstance("fake", limiter, ingester.store, ingester.local, b) - assert.NoError(t, err, "unexpected error creating new instance") + ingester, _, _ := defaultIngester(t, tempDir) + ingester.limiter = limiter + i, err := ingester.getOrCreateInstance("fake") + assert.NoError(t, err, "unexpected error creating new instance") - var tagKey = "foo" - var tagValue = "bar" + var tagKey = "foo" + var tagValue = "bar" - _, _ = writeTracesWithSearchData(t, i, tagKey, tagValue, true) + _, _ = writeTracesWithSearchData(t, i, tagKey, tagValue, true) - userCtx := user.InjectOrgID(context.Background(), "fake") - resp, err := i.SearchTagValues(userCtx, tagKey) - require.NoError(t, err) - require.Equal(t, 2, len(resp.TagValues)) // Only two values of the form "bar123" fit in the 10 byte limit above. - }) - } + userCtx := user.InjectOrgID(context.Background(), "fake") + resp, err := i.SearchTagValues(userCtx, tagKey) + require.NoError(t, err) + require.Equal(t, 2, len(resp.TagValues)) // Only two values of the form "bar123" fit in the 10 byte limit above. } // writes traces to the given instance along with search data. returns @@ -276,7 +266,6 @@ func writeTracesWithSearchData(t *testing.T, i *instance, tagKey string, tagValu dec := model.MustNewSegmentDecoder(model.CurrentEncoding) numTraces := 100 - searchAnnotatedFractionDenominator := 10 ids := [][]byte{} expectedTagValues := []string{} @@ -284,62 +273,50 @@ func writeTracesWithSearchData(t *testing.T, i *instance, tagKey string, tagValu id := make([]byte, 16) rand.Read(id) + tv := tagValue + if postFixValue { + tv = tv + strconv.Itoa(j) + } + kv := &v1.KeyValue{Key: tagKey, Value: &v1.AnyValue{Value: &v1.AnyValue_StringValue{StringValue: tv}}} + expectedTagValues = append(expectedTagValues, tv) + ids = append(ids, id) + testTrace := test.MakeTrace(10, id) + testTrace.Batches[0].ScopeSpans[0].Spans[0].Attributes = append(testTrace.Batches[0].ScopeSpans[0].Spans[0].Attributes, kv) trace.SortTrace(testTrace) + traceBytes, err := dec.PrepareForWrite(testTrace, 0, 0) require.NoError(t, err) - // annotate just a fraction of traces with search data - var searchData []byte - if j%searchAnnotatedFractionDenominator == 0 { - tv := tagValue - if postFixValue { - tv = tv + strconv.Itoa(j) - } - - data := &tempofb.SearchEntryMutable{} - data.TraceID = id - data.AddTag(tagKey, tv) - searchData = data.ToBytes() - - expectedTagValues = append(expectedTagValues, tv) - ids = append(ids, data.TraceID) - } - // searchData will be nil if not - err = i.PushBytes(context.Background(), id, traceBytes, searchData) + err = i.PushBytes(context.Background(), id, traceBytes, nil) require.NoError(t, err) assert.Equal(t, int(i.traceCount.Load()), len(i.traces)) } + // traces have to be cut to show up in searches + err := i.CutCompleteTraces(0, true) + require.NoError(t, err) + return ids, expectedTagValues } func TestInstanceSearchNoData(t *testing.T) { - for _, b := range []bool{true, false} { - t.Run(fmt.Sprintf("flatbufferSearch:%t", b), func(t *testing.T) { - i, _, _ := defaultInstanceWithFlatBufferSearch(t, b) + i, _ := defaultInstance(t) - var req = &tempopb.SearchRequest{ - Tags: map[string]string{}, - } - - sr, err := i.Search(context.Background(), req) - assert.NoError(t, err) - require.Len(t, sr.Traces, 0) - - }) + var req = &tempopb.SearchRequest{ + Tags: map[string]string{}, } + + sr, err := i.Search(context.Background(), req) + assert.NoError(t, err) + require.Len(t, sr.Traces, 0) } func TestInstanceSearchDoesNotRace(t *testing.T) { - limits, err := overrides.NewOverrides(overrides.Limits{}) - require.NoError(t, err) - limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) - - ingester, _, _ := defaultIngester(t, t.TempDir(), v2.VersionString) - i, err := newInstance("fake", limiter, ingester.store, ingester.local, false) + ingester, _, _ := defaultIngester(t, t.TempDir()) + i, err := ingester.getOrCreateInstance("fake") require.NoError(t, err) // This matches the encoding for live traces, since @@ -443,150 +420,141 @@ func TestInstanceSearchDoesNotRace(t *testing.T) { } func TestWALBlockDeletedDuringSearch(t *testing.T) { - for _, b := range []bool{true, false} { - t.Run(fmt.Sprintf("flatbufferSearch:%t", b), func(t *testing.T) { - i, _, _ := defaultInstanceWithFlatBufferSearch(t, b) - - // This matches the encoding for live traces, since - // we are pushing to the instance directly it must match. - dec := model.MustNewSegmentDecoder(model.CurrentEncoding) - - end := make(chan struct{}) - - concurrent := func(f func()) { - for { - select { - case <-end: - return - default: - f() - } - } - } - - for j := 0; j < 500; j++ { - id := make([]byte, 16) - rand.Read(id) + i, _ := defaultInstance(t) - trace := test.MakeTrace(10, id) - traceBytes, err := dec.PrepareForWrite(trace, 0, 0) - require.NoError(t, err) + // This matches the encoding for live traces, since + // we are pushing to the instance directly it must match. + dec := model.MustNewSegmentDecoder(model.CurrentEncoding) - entry := &tempofb.SearchEntryMutable{} - entry.TraceID = id - entry.AddTag("foo", "bar") - searchBytes := entry.ToBytes() + end := make(chan struct{}) - err = i.PushBytes(context.Background(), id, traceBytes, searchBytes) - require.NoError(t, err) + concurrent := func(f func()) { + for { + select { + case <-end: + return + default: + f() } + } + } - err := i.CutCompleteTraces(0, true) - require.NoError(t, err) + for j := 0; j < 500; j++ { + id := make([]byte, 16) + rand.Read(id) - blockID, err := i.CutBlockIfReady(0, 0, true) - require.NoError(t, err) + trace := test.MakeTrace(10, id) + traceBytes, err := dec.PrepareForWrite(trace, 0, 0) + require.NoError(t, err) - go concurrent(func() { - _, err := i.Search(context.Background(), &tempopb.SearchRequest{ - Tags: map[string]string{ - // Not present in the data, so it will be an exhaustive - // search - "wuv": "xyz", - }, - }) - require.NoError(t, err) - }) + entry := &tempofb.SearchEntryMutable{} + entry.TraceID = id + entry.AddTag("foo", "bar") + searchBytes := entry.ToBytes() + + err = i.PushBytes(context.Background(), id, traceBytes, searchBytes) + require.NoError(t, err) + } - // Let search get going - time.Sleep(100 * time.Millisecond) + err := i.CutCompleteTraces(0, true) + require.NoError(t, err) - err = i.ClearCompletingBlock(blockID) - require.NoError(t, err) + blockID, err := i.CutBlockIfReady(0, 0, true) + require.NoError(t, err) - // Wait for go funcs to quit before - // exiting and cleaning up - close(end) - time.Sleep(2 * time.Second) + go concurrent(func() { + _, err := i.Search(context.Background(), &tempopb.SearchRequest{ + Tags: map[string]string{ + // Not present in the data, so it will be an exhaustive + // search + "wuv": "xyz", + }, }) - } + require.NoError(t, err) + }) + + // Let search get going + time.Sleep(100 * time.Millisecond) + err = i.ClearCompletingBlock(blockID) + require.NoError(t, err) + + // Wait for go funcs to quit before + // exiting and cleaning up + close(end) + time.Sleep(2 * time.Second) } func TestInstanceSearchMetrics(t *testing.T) { - for _, b := range []bool{true, false} { - t.Run(fmt.Sprintf("flatbufferSearch:%t", b), func(t *testing.T) { - i, _, _ := defaultInstanceWithFlatBufferSearch(t, b) + i, _ := defaultInstance(t) - // This matches the encoding for live traces, since - // we are pushing to the instance directly it must match. - dec := model.MustNewSegmentDecoder(model.CurrentEncoding) + // This matches the encoding for live traces, since + // we are pushing to the instance directly it must match. + dec := model.MustNewSegmentDecoder(model.CurrentEncoding) - numTraces := uint32(500) - numBytes := uint64(0) - for j := uint32(0); j < numTraces; j++ { - id := test.ValidTraceID(nil) + numTraces := uint32(500) + numBytes := uint64(0) + for j := uint32(0); j < numTraces; j++ { + id := test.ValidTraceID(nil) - // Trace bytes have to be pushed in the expected data encoding - trace := test.MakeTrace(10, id) + // Trace bytes have to be pushed in the expected data encoding + trace := test.MakeTrace(10, id) - traceBytes, err := dec.PrepareForWrite(trace, 0, 0) - require.NoError(t, err) + traceBytes, err := dec.PrepareForWrite(trace, 0, 0) + require.NoError(t, err) - data := &tempofb.SearchEntryMutable{} - data.TraceID = id - data.AddTag("foo", "bar") - searchData := data.ToBytes() + data := &tempofb.SearchEntryMutable{} + data.TraceID = id + data.AddTag("foo", "bar") + searchData := data.ToBytes() - numBytes += uint64(len(searchData)) + numBytes += uint64(len(searchData)) - err = i.PushBytes(context.Background(), id, traceBytes, searchData) - require.NoError(t, err) + err = i.PushBytes(context.Background(), id, traceBytes, searchData) + require.NoError(t, err) - assert.Equal(t, int(i.traceCount.Load()), len(i.traces)) - } + assert.Equal(t, int(i.traceCount.Load()), len(i.traces)) + } - search := func() *tempopb.SearchMetrics { - sr, err := i.Search(context.Background(), &tempopb.SearchRequest{ - // Exhaustive search - Tags: map[string]string{search.SecretExhaustiveSearchTag: "!"}, - }) - require.NoError(t, err) - return sr.Metrics - } + search := func() *tempopb.SearchMetrics { + sr, err := i.Search(context.Background(), &tempopb.SearchRequest{ + // Exhaustive search + Tags: map[string]string{search.SecretExhaustiveSearchTag: "!"}, + }) + require.NoError(t, err) + return sr.Metrics + } - // Live traces - m := search() - require.Equal(t, numTraces, m.InspectedTraces) - require.Equal(t, numBytes, m.InspectedBytes) - require.Equal(t, uint32(1), m.InspectedBlocks) // 1 head block + // Live traces + m := search() + require.Equal(t, numTraces, m.InspectedTraces) + require.Equal(t, numBytes, m.InspectedBytes) + require.Equal(t, uint32(1), m.InspectedBlocks) // 1 head block - // Test after appending to WAL - err := i.CutCompleteTraces(0, true) - require.NoError(t, err) - m = search() - require.Equal(t, numTraces, m.InspectedTraces) - require.Equal(t, numBytes, m.InspectedBytes) - require.Equal(t, uint32(1), m.InspectedBlocks) // 1 head block + // Test after appending to WAL + err := i.CutCompleteTraces(0, true) + require.NoError(t, err) + m = search() + require.Equal(t, numTraces, m.InspectedTraces) + require.Less(t, numBytes, m.InspectedBytes) + require.Equal(t, uint32(1), m.InspectedBlocks) // 1 head block - // Test after cutting new headblock - blockID, err := i.CutBlockIfReady(0, 0, true) - require.NoError(t, err) - m = search() - require.Equal(t, numTraces, m.InspectedTraces) - require.Equal(t, numBytes, m.InspectedBytes) - require.Equal(t, uint32(2), m.InspectedBlocks) // 1 head block, 1 completing block + // Test after cutting new headblock + blockID, err := i.CutBlockIfReady(0, 0, true) + require.NoError(t, err) + m = search() + require.Equal(t, numTraces, m.InspectedTraces) + require.Less(t, numBytes, m.InspectedBytes) + require.Equal(t, uint32(2), m.InspectedBlocks) // 1 head block, 1 completing block - // Test after completing a block - err = i.CompleteBlock(blockID) - require.NoError(t, err) - err = i.ClearCompletingBlock(blockID) - require.NoError(t, err) - m = search() - require.Equal(t, numTraces, m.InspectedTraces) - require.Equal(t, uint32(2), m.InspectedBlocks) // 1 head block, 1 complete block - }) - } + // Test after completing a block + err = i.CompleteBlock(blockID) + require.NoError(t, err) + err = i.ClearCompletingBlock(blockID) + require.NoError(t, err) + m = search() + require.Equal(t, numTraces, m.InspectedTraces) + require.Equal(t, uint32(2), m.InspectedBlocks) // 1 head block, 1 complete block } func BenchmarkInstanceSearchUnderLoad(b *testing.B) { diff --git a/modules/ingester/instance_test.go b/modules/ingester/instance_test.go index a1d90843602..e0065c7ac8c 100644 --- a/modules/ingester/instance_test.go +++ b/modules/ingester/instance_test.go @@ -8,8 +8,6 @@ import ( "time" "github.com/google/uuid" - v2 "github.com/grafana/tempo/tempodb/encoding/v2" - "github.com/grafana/tempo/tempodb/encoding/vparquet" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/weaveworks/common/user" @@ -216,7 +214,8 @@ func TestInstanceLimits(t *testing.T) { require.NoError(t, err, "unexpected error creating limits") limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) - ingester, _, _ := defaultIngester(t, t.TempDir(), v2.VersionString) + ingester, _, _ := defaultIngester(t, t.TempDir()) + ingester.limiter = limiter type push struct { req *tempopb.PushBytesRequest @@ -293,7 +292,8 @@ func TestInstanceLimits(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - i, err := newInstance(testTenantID, limiter, ingester.store, ingester.local, false) + delete(ingester.instances, testTenantID) // force recreate instance to reset limits + i, err := ingester.getOrCreateInstance(testTenantID) require.NoError(t, err, "unexpected error creating new instance") for j, push := range tt.pushes { @@ -493,42 +493,44 @@ func TestInstanceMetrics(t *testing.T) { func TestInstanceFailsLargeTracesEvenAfterFlushing(t *testing.T) { ctx := context.Background() - maxTraceBytes := 100 + maxTraceBytes := 1000 id := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} - ingester, _, _ := defaultIngester(t, t.TempDir(), v2.VersionString) - limits, err := overrides.NewOverrides(overrides.Limits{ MaxBytesPerTrace: maxTraceBytes, }) require.NoError(t, err) limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) - i, err := newInstance(testTenantID, limiter, ingester.store, ingester.local, false) + ingester, _, _ := defaultIngester(t, t.TempDir()) + ingester.limiter = limiter + i, err := ingester.getOrCreateInstance(testTenantID) require.NoError(t, err) - pushFn := func(byteCount int) error { - return i.PushBytes(ctx, id, make([]byte, byteCount), nil) + req := makeRequestWithByteLimit(maxTraceBytes-200, id) + reqSize := 0 + for _, b := range req.Traces { + reqSize += len(b.Slice) } // Fill up trace to max - err = pushFn(maxTraceBytes) + err = i.PushBytesRequest(ctx, req) require.NoError(t, err) // Pushing again fails - err = pushFn(3) - require.Contains(t, err.Error(), (newTraceTooLargeError(id, i.instanceID, maxTraceBytes, 3)).Error()) + err = i.PushBytesRequest(ctx, req) + require.Contains(t, err.Error(), (newTraceTooLargeError(id, i.instanceID, maxTraceBytes, reqSize)).Error()) // Pushing still fails after flush err = i.CutCompleteTraces(0, true) require.NoError(t, err) - err = pushFn(5) - require.Contains(t, err.Error(), (newTraceTooLargeError(id, i.instanceID, maxTraceBytes, 5)).Error()) + err = i.PushBytesRequest(ctx, req) + require.Contains(t, err.Error(), (newTraceTooLargeError(id, i.instanceID, maxTraceBytes, reqSize)).Error()) // Cut block and then pushing works again _, err = i.CutBlockIfReady(0, 0, true) require.NoError(t, err) - err = pushFn(maxTraceBytes) + err = i.PushBytesRequest(ctx, req) require.NoError(t, err) } @@ -569,34 +571,15 @@ func TestSortByteSlices(t *testing.T) { } func defaultInstance(t testing.TB) (*instance, *Ingester) { - instance, ingester, _ := defaultInstanceWithFlatBufferSearch(t, false) + instance, ingester, _ := defaultInstanceAndTmpDir(t) return instance, ingester } -func defaultInstanceWithFlatBufferSearch(t testing.TB, fbSearch bool) (*instance, *Ingester, string) { - limits, err := overrides.NewOverrides(overrides.Limits{}) - require.NoError(t, err, "unexpected error creating limits") - limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) - +func defaultInstanceAndTmpDir(t testing.TB) (*instance, *Ingester, string) { tmpDir := t.TempDir() - ingester, _, _ := defaultIngester(t, tmpDir, v2.VersionString) - instance, err := newInstance(testTenantID, limiter, ingester.store, ingester.local, fbSearch) - require.NoError(t, err, "unexpected error creating new instance") - - return instance, ingester, tmpDir -} - -// defaultInstanceWithParquet returns an instance with vParquet WAL, and no trace data. -func defaultInstanceWithParquet(t testing.TB) (*instance, *Ingester, string) { - limits, err := overrides.NewOverrides(overrides.Limits{}) - require.NoError(t, err, "unexpected error creating limits") - limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) - - tmpDir := t.TempDir() - - ingester, _, _ := defaultIngester(t, tmpDir, vparquet.VersionString) - instance, err := newInstance(testTenantID, limiter, ingester.store, ingester.local, false) + ingester, _, _ := defaultIngester(t, tmpDir) + instance, err := ingester.getOrCreateInstance(testTenantID) require.NoError(t, err, "unexpected error creating new instance") return instance, ingester, tmpDir @@ -651,20 +634,14 @@ func BenchmarkInstanceFindTraceByIDFromCompleteBlock(b *testing.B) { } } -func BenchmarkInstanceSearchCompleteFB(b *testing.B) { - benchmarkInstanceSearch(b, true) -} func BenchmarkInstanceSearchCompleteParquet(b *testing.B) { - benchmarkInstanceSearch(b, false) -} -func TestInstanceSearchCompleteFB(t *testing.T) { - benchmarkInstanceSearch(t, true) + benchmarkInstanceSearch(b) } func TestInstanceSearchCompleteParquet(t *testing.T) { - benchmarkInstanceSearch(t, false) + benchmarkInstanceSearch(t) } -func benchmarkInstanceSearch(b testing.TB, fb bool) { - instance, _, _ := defaultInstanceWithFlatBufferSearch(b, fb) +func benchmarkInstanceSearch(b testing.TB) { + instance, _ := defaultInstance(b) for i := 0; i < 1000; i++ { request := makeRequest(nil) err := instance.PushBytesRequest(context.Background(), request) diff --git a/modules/querier/config.go b/modules/querier/config.go index d9af470569c..8d63c91a340 100644 --- a/modules/querier/config.go +++ b/modules/querier/config.go @@ -11,13 +11,13 @@ import ( // Config for a querier. type Config struct { - Search SearchConfig `yaml:"search"` + Search SearchConfig `yaml:"search"` + TraceByID TraceByIDConfig `yaml:"trace_by_id"` - TraceLookupQueryTimeout time.Duration `yaml:"query_timeout"` - ExtraQueryDelay time.Duration `yaml:"extra_query_delay,omitempty"` - MaxConcurrentQueries int `yaml:"max_concurrent_queries"` - Worker worker.Config `yaml:"frontend_worker"` - QueryRelevantIngesters bool `yaml:"query_relevant_ingesters"` + ExtraQueryDelay time.Duration `yaml:"extra_query_delay,omitempty"` + MaxConcurrentQueries int `yaml:"max_concurrent_queries"` + Worker worker.Config `yaml:"frontend_worker"` + QueryRelevantIngesters bool `yaml:"query_relevant_ingesters"` } type SearchConfig struct { @@ -28,13 +28,17 @@ type SearchConfig struct { HedgeRequestsUpTo int `yaml:"external_hedge_requests_up_to"` } +type TraceByIDConfig struct { + QueryTimeout time.Duration `yaml:"query_timeout"` +} + // RegisterFlagsAndApplyDefaults register flags. func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) { - cfg.TraceLookupQueryTimeout = 10 * time.Second + cfg.TraceByID.QueryTimeout = 10 * time.Second cfg.QueryRelevantIngesters = false cfg.ExtraQueryDelay = 0 - cfg.MaxConcurrentQueries = 5 - cfg.Search.PreferSelf = 2 + cfg.MaxConcurrentQueries = 20 + cfg.Search.PreferSelf = 10 cfg.Search.HedgeRequestsAt = 8 * time.Second cfg.Search.HedgeRequestsUpTo = 2 cfg.Search.QueryTimeout = 30 * time.Second diff --git a/modules/querier/http.go b/modules/querier/http.go index 820c27aedc1..e431a8933fb 100644 --- a/modules/querier/http.go +++ b/modules/querier/http.go @@ -29,7 +29,7 @@ const ( // TraceByIDHandler is a http.HandlerFunc to retrieve traces func (q *Querier) TraceByIDHandler(w http.ResponseWriter, r *http.Request) { // Enforce the query timeout while querying backends - ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(q.cfg.TraceLookupQueryTimeout)) + ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(q.cfg.TraceByID.QueryTimeout)) defer cancel() span, ctx := opentracing.StartSpanFromContext(ctx, "Querier.TraceByIDHandler") diff --git a/modules/storage/config.go b/modules/storage/config.go index b63220caec9..250a723bfc5 100644 --- a/modules/storage/config.go +++ b/modules/storage/config.go @@ -15,11 +15,17 @@ import ( "github.com/grafana/tempo/tempodb/backend/s3" "github.com/grafana/tempo/tempodb/encoding" "github.com/grafana/tempo/tempodb/encoding/common" - v2 "github.com/grafana/tempo/tempodb/encoding/v2" "github.com/grafana/tempo/tempodb/pool" "github.com/grafana/tempo/tempodb/wal" ) +const ( + DefaultBloomFP = .01 + DefaultBloomShardSizeBytes = 100 * 1024 + DefaultIndexDownSampleBytes = 1024 * 1024 + DefaultIndexPageSizeBytes = 250 * 1024 +) + // Config is the Tempo storage configuration type Config struct { Trace tempodb.Config `yaml:"trace"` @@ -37,7 +43,6 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) cfg.Trace.WAL = &wal.Config{} f.StringVar(&cfg.Trace.WAL.Filepath, util.PrefixConfig(prefix, "trace.wal.path"), "/var/tempo/wal", "Path at which store WAL blocks.") - cfg.Trace.WAL.Version = v2.VersionString cfg.Trace.WAL.Encoding = backend.EncSnappy cfg.Trace.WAL.SearchEncoding = backend.EncNone cfg.Trace.WAL.IngestionSlack = 2 * time.Minute @@ -49,10 +54,10 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) cfg.Trace.Search.ReadBufferSizeBytes = tempodb.DefaultReadBufferSize cfg.Trace.Block = &common.BlockConfig{} - f.Float64Var(&cfg.Trace.Block.BloomFP, util.PrefixConfig(prefix, "trace.block.bloom-filter-false-positive"), .01, "Bloom Filter False Positive.") - f.IntVar(&cfg.Trace.Block.BloomShardSizeBytes, util.PrefixConfig(prefix, "trace.block.bloom-filter-shard-size-bytes"), 100*1024, "Bloom Filter Shard Size in bytes.") - f.IntVar(&cfg.Trace.Block.IndexDownsampleBytes, util.PrefixConfig(prefix, "trace.block.index-downsample-bytes"), 1024*1024, "Number of bytes (before compression) per index record.") - f.IntVar(&cfg.Trace.Block.IndexPageSizeBytes, util.PrefixConfig(prefix, "trace.block.index-page-size-bytes"), 250*1024, "Number of bytes per index page.") + f.Float64Var(&cfg.Trace.Block.BloomFP, util.PrefixConfig(prefix, "trace.block.v2-bloom-filter-false-positive"), DefaultBloomFP, "Bloom Filter False Positive.") + f.IntVar(&cfg.Trace.Block.BloomShardSizeBytes, util.PrefixConfig(prefix, "trace.block.v2-bloom-filter-shard-size-bytes"), DefaultBloomShardSizeBytes, "Bloom Filter Shard Size in bytes.") + f.IntVar(&cfg.Trace.Block.IndexDownsampleBytes, util.PrefixConfig(prefix, "trace.block.v2-index-downsample-bytes"), DefaultIndexDownSampleBytes, "Number of bytes (before compression) per index record.") + f.IntVar(&cfg.Trace.Block.IndexPageSizeBytes, util.PrefixConfig(prefix, "trace.block.v2-index-page-size-bytes"), DefaultIndexPageSizeBytes, "Number of bytes per index page.") cfg.Trace.Block.Version = encoding.DefaultEncoding().Version() cfg.Trace.Block.Encoding = backend.EncZstd cfg.Trace.Block.SearchEncoding = backend.EncSnappy @@ -88,6 +93,6 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) cfg.Trace.BackgroundCache.WriteBackGoroutines = 10 cfg.Trace.Pool = &pool.Config{} - f.IntVar(&cfg.Trace.Pool.MaxWorkers, util.PrefixConfig(prefix, "trace.pool.max-workers"), 50, "Workers in the worker pool.") - f.IntVar(&cfg.Trace.Pool.QueueDepth, util.PrefixConfig(prefix, "trace.pool.queue-depth"), 10000, "Work item queue depth.") + f.IntVar(&cfg.Trace.Pool.MaxWorkers, util.PrefixConfig(prefix, "trace.pool.max-workers"), 400, "Workers in the worker pool.") + f.IntVar(&cfg.Trace.Pool.QueueDepth, util.PrefixConfig(prefix, "trace.pool.queue-depth"), 20000, "Work item queue depth.") } diff --git a/operations/jsonnet-compiled/ConfigMap-tempo-compactor.yaml b/operations/jsonnet-compiled/ConfigMap-tempo-compactor.yaml index be150db161e..c02d676fe74 100644 --- a/operations/jsonnet-compiled/ConfigMap-tempo-compactor.yaml +++ b/operations/jsonnet-compiled/ConfigMap-tempo-compactor.yaml @@ -4,7 +4,7 @@ data: compactor: compaction: block_retention: 144h - chunk_size_bytes: 1.048576e+07 + v2_in_buffer_bytes: 1.048576e+07 ring: kvstore: store: memberlist diff --git a/operations/jsonnet-compiled/Deployment-compactor.yaml b/operations/jsonnet-compiled/Deployment-compactor.yaml index 69ddc32ab85..398f10e3de1 100644 --- a/operations/jsonnet-compiled/Deployment-compactor.yaml +++ b/operations/jsonnet-compiled/Deployment-compactor.yaml @@ -18,7 +18,7 @@ spec: template: metadata: annotations: - config_hash: 4b31dabe47e305e29ccc562cf6212642 + config_hash: 99e04853e18fdcffc37bcbd793023bee labels: app: compactor name: compactor diff --git a/operations/jsonnet-compiled/util/jsonnetfile.lock.json b/operations/jsonnet-compiled/util/jsonnetfile.lock.json index 2378228edd4..e3ac98d8d2e 100644 --- a/operations/jsonnet-compiled/util/jsonnetfile.lock.json +++ b/operations/jsonnet-compiled/util/jsonnetfile.lock.json @@ -8,7 +8,7 @@ "subdir": "ksonnet-util" } }, - "version": "e6a0083d9cc0f0ec79507397ce0e156d558f6efb", + "version": "a924ab1b5fd4e6eacd7235a20978d050a27bdb65", "sum": "/pkNOLhRqvQoPA0yYdUuJvpPHqhkCLauAUMD2ZHMIkE=" }, { @@ -18,7 +18,7 @@ "subdir": "memcached" } }, - "version": "e6a0083d9cc0f0ec79507397ce0e156d558f6efb", + "version": "a924ab1b5fd4e6eacd7235a20978d050a27bdb65", "sum": "SWywAq4U0MRPMbASU0Ez8O9ArRNeoZzb75sEuReueow=" }, { diff --git a/operations/jsonnet/microservices/configmap.libsonnet b/operations/jsonnet/microservices/configmap.libsonnet index ea9567ba51b..f9713a17627 100644 --- a/operations/jsonnet/microservices/configmap.libsonnet +++ b/operations/jsonnet/microservices/configmap.libsonnet @@ -77,7 +77,7 @@ tempo_compactor_config:: $.tempo_config { compactor+: { compaction+: { - chunk_size_bytes: 10485760, + v2_in_buffer_bytes: 10485760, block_retention: '144h', }, ring+: { diff --git a/operations/tempo-mixin/runbook.md b/operations/tempo-mixin/runbook.md index 17908eab6aa..d56160ef9e9 100644 --- a/operations/tempo-mixin/runbook.md +++ b/operations/tempo-mixin/runbook.md @@ -15,6 +15,7 @@ parallelism - - Number of shards each query is split into, configured via ``` query_frontend: + trace_by_id: query_shards: 10 ``` - Number of Queriers (each of these process the sharded queries in parallel). This can be changed by modifying the size of the @@ -115,7 +116,7 @@ There are several settings which can be tuned to reduce the amount of work done 30 minutes. It could be reduced even further in extremely high volume situations. - max_block_bytes - The maximum size of an output block, and controls which input blocks will be compacted. Can be reduced to as little as a few GB to prevent really large compactions. -- chunk_size_bytes - The amount of (compressed) data buffered from each input block. Can be reduced to a few megabytes to buffer +- v2_in_buffer_bytes - The amount of (compressed) data buffered from each input block. Can be reduced to a few megabytes to buffer less. Will increase the amount of reads from the backend. - flush_size_bytes - The amount of data buffered of the output block. Can be reduced to flush more frequently to the backend. There are platform-specific limits on how low this can go. AWS S3 cannot be set lower than 5MB, or cause more than 10K flushes diff --git a/tempodb/compactor.go b/tempodb/compactor.go index 22f5ce84e61..f4ff6aebe89 100644 --- a/tempodb/compactor.go +++ b/tempodb/compactor.go @@ -24,9 +24,9 @@ const ( DefaultCompactionCycle = 30 * time.Second - DefaultFlushSizeBytes uint32 = 20 * 1024 * 1024 // 20 MiB - - DefaultIteratorBufferSize = 1000 + DefaultChunkSizeBytes = 5 * 1024 * 1024 // 5 MiB + DefaultFlushSizeBytes uint32 = 20 * 1024 * 1024 // 20 MiB + DefaultIteratorBufferSize = 1000 ) var ( diff --git a/tempodb/compactor_test.go b/tempodb/compactor_test.go index 49856498673..bb6d5cd355d 100644 --- a/tempodb/compactor_test.go +++ b/tempodb/compactor_test.go @@ -93,7 +93,6 @@ func testCompactionRoundtrip(t *testing.T, targetBlockVersion string) { }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), - Version: v2.VersionString, }, BlocklistPoll: 0, }, log.NewNopLogger()) @@ -238,7 +237,6 @@ func testSameIDCompaction(t *testing.T, targetBlockVersion string) { }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), - Version: v2.VersionString, }, BlocklistPoll: 0, }, log.NewNopLogger()) @@ -380,7 +378,6 @@ func TestCompactionUpdatesBlocklist(t *testing.T) { }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), - Version: v2.VersionString, }, BlocklistPoll: 0, }, log.NewNopLogger()) @@ -450,7 +447,6 @@ func TestCompactionMetrics(t *testing.T) { }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), - Version: v2.VersionString, }, BlocklistPoll: 0, }, log.NewNopLogger()) @@ -523,7 +519,6 @@ func TestCompactionIteratesThroughTenants(t *testing.T) { }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), - Version: v2.VersionString, }, BlocklistPoll: 0, }, log.NewNopLogger()) @@ -595,7 +590,6 @@ func testCompactionHonorsBlockStartEndTimes(t *testing.T, targetBlockVersion str }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), - Version: v2.VersionString, IngestionSlack: time.Since(time.Unix(0, 0)), // Let us use obvious start/end times below }, BlocklistPoll: 0, diff --git a/tempodb/config.go b/tempodb/config.go index 94f0f991ec7..047c1d032fd 100644 --- a/tempodb/config.go +++ b/tempodb/config.go @@ -27,8 +27,8 @@ const ( DefaultPrefetchTraceCount = 1000 DefaultSearchChunkSizeBytes = 1_000_000 - DefaultReadBufferCount = 8 - DefaultReadBufferSize = 4 * 1024 * 1024 + DefaultReadBufferCount = 32 + DefaultReadBufferSize = 1 * 1024 * 1024 ) // Config holds the entirety of tempodb configuration @@ -103,20 +103,24 @@ func (c SearchConfig) ApplyToOptions(o *common.SearchOptions) { // CompactorConfig contains compaction configuration options type CompactorConfig struct { - ChunkSizeBytes uint32 `yaml:"chunk_size_bytes"` - FlushSizeBytes uint32 `yaml:"flush_size_bytes"` + ChunkSizeBytes uint32 `yaml:"v2_in_buffer_bytes"` + FlushSizeBytes uint32 `yaml:"v2_out_buffer_bytes"` + IteratorBufferSize int `yaml:"v2_prefetch_traces_count"` MaxCompactionRange time.Duration `yaml:"compaction_window"` MaxCompactionObjects int `yaml:"max_compaction_objects"` MaxBlockBytes uint64 `yaml:"max_block_bytes"` BlockRetention time.Duration `yaml:"block_retention"` CompactedBlockRetention time.Duration `yaml:"compacted_block_retention"` RetentionConcurrency uint `yaml:"retention_concurrency"` - IteratorBufferSize int `yaml:"iterator_buffer_size"` MaxTimePerTenant time.Duration `yaml:"max_time_per_tenant"` CompactionCycle time.Duration `yaml:"compaction_cycle"` } func validateConfig(cfg *Config) error { + if cfg == nil { + return errors.New("config should be non-nil") + } + if cfg.WAL == nil { return errors.New("wal config should be non-nil") } @@ -125,6 +129,11 @@ func validateConfig(cfg *Config) error { return errors.New("block config should be non-nil") } + // if the wal version is unspecified default to the block version + if cfg.WAL.Version == "" { + cfg.WAL.Version = cfg.Block.Version + } + err := wal.ValidateConfig(cfg.WAL) if err != nil { return fmt.Errorf("wal config validation failed: %w", err) diff --git a/tempodb/config_test.go b/tempodb/config_test.go index 5e7c6db5ad4..f6200b1de70 100644 --- a/tempodb/config_test.go +++ b/tempodb/config_test.go @@ -1,9 +1,11 @@ package tempodb import ( + "errors" "testing" "github.com/grafana/tempo/tempodb/encoding/common" + "github.com/grafana/tempo/tempodb/wal" "github.com/stretchr/testify/require" ) @@ -38,3 +40,89 @@ func TestApplyToOptions(t *testing.T) { require.Equal(t, cfg.ReadBufferCount, 6) require.Equal(t, cfg.ReadBufferSizeBytes, 7) } + +func TestValidateConfig(t *testing.T) { + tests := []struct { + cfg *Config + expectedConfig *Config + err error + }{ + // nil config fails + { + err: errors.New("config should be non-nil"), + }, + // nil wal fails + { + cfg: &Config{}, + err: errors.New("wal config should be non-nil"), + }, + // nil block fails + { + cfg: &Config{ + WAL: &wal.Config{}, + }, + err: errors.New("block config should be non-nil"), + }, + // block version copied to wal if empty + { + cfg: &Config{ + WAL: &wal.Config{}, + Block: &common.BlockConfig{ + IndexDownsampleBytes: 1, + IndexPageSizeBytes: 1, + BloomFP: 0.01, + BloomShardSizeBytes: 1, + Version: "v2", + }, + }, + expectedConfig: &Config{ + WAL: &wal.Config{ + Version: "v2", + }, + Block: &common.BlockConfig{ + IndexDownsampleBytes: 1, + IndexPageSizeBytes: 1, + BloomFP: 0.01, + BloomShardSizeBytes: 1, + Version: "v2", + }, + }, + }, + // block version not copied to wal if populated + { + cfg: &Config{ + WAL: &wal.Config{ + Version: "vParquet", + }, + Block: &common.BlockConfig{ + IndexDownsampleBytes: 1, + IndexPageSizeBytes: 1, + BloomFP: 0.01, + BloomShardSizeBytes: 1, + Version: "v2", + }, + }, + expectedConfig: &Config{ + WAL: &wal.Config{ + Version: "vParquet", + }, + Block: &common.BlockConfig{ + IndexDownsampleBytes: 1, + IndexPageSizeBytes: 1, + BloomFP: 0.01, + BloomShardSizeBytes: 1, + Version: "v2", + }, + }, + }, + } + + for _, test := range tests { + err := validateConfig(test.cfg) + require.Equal(t, test.err, err) + + if test.expectedConfig != nil { + require.Equal(t, test.expectedConfig, test.cfg) + } + } +} diff --git a/tempodb/encoding/common/config.go b/tempodb/encoding/common/config.go index 7233499ca49..ef1b32367a8 100644 --- a/tempodb/encoding/common/config.go +++ b/tempodb/encoding/common/config.go @@ -8,17 +8,19 @@ import ( // BlockConfig holds configuration options for newly created blocks type BlockConfig struct { - IndexDownsampleBytes int `yaml:"index_downsample_bytes"` - IndexPageSizeBytes int `yaml:"index_page_size_bytes"` - BloomFP float64 `yaml:"bloom_filter_false_positive"` - BloomShardSizeBytes int `yaml:"bloom_filter_shard_size_bytes"` - Version string `yaml:"version"` - Encoding backend.Encoding `yaml:"encoding"` - SearchEncoding backend.Encoding `yaml:"search_encoding"` - SearchPageSizeBytes int `yaml:"search_page_size_bytes"` + BloomFP float64 `yaml:"bloom_filter_false_positive"` + BloomShardSizeBytes int `yaml:"bloom_filter_shard_size_bytes"` + Version string `yaml:"version"` + SearchEncoding backend.Encoding `yaml:"search_encoding"` + SearchPageSizeBytes int `yaml:"search_page_size_bytes"` + + // v2 fields + IndexDownsampleBytes int `yaml:"v2_index_downsample_bytes"` + IndexPageSizeBytes int `yaml:"v2_index_page_size_bytes"` + Encoding backend.Encoding `yaml:"v2_encoding"` // parquet fields - RowGroupSizeBytes int `yaml:"row_group_size_bytes"` + RowGroupSizeBytes int `yaml:"parquet_row_group_size_bytes"` } // ValidateConfig returns true if the config is valid diff --git a/tempodb/encoding/vparquet/wal_block.go b/tempodb/encoding/vparquet/wal_block.go index 9ad4a7d40b8..5eaa681a6ae 100644 --- a/tempodb/encoding/vparquet/wal_block.go +++ b/tempodb/encoding/vparquet/wal_block.go @@ -75,6 +75,10 @@ func openWALBlock(filename string, path string, ingestionSlack time.Duration, _ return nil, nil, fmt.Errorf("error unmarshaling wal meta json: %s %w", metaPath, err) } + // below we're going to iterate all of the parquet files in the wal and build the meta, this will correctly + // recount total objects so clear them out here + meta.TotalObjects = 0 + b := &walBlock{ meta: meta, path: path, diff --git a/tempodb/retention_test.go b/tempodb/retention_test.go index 2006d5b3a76..06cc1b5f483 100644 --- a/tempodb/retention_test.go +++ b/tempodb/retention_test.go @@ -16,7 +16,6 @@ import ( "github.com/grafana/tempo/tempodb/backend/local" "github.com/grafana/tempo/tempodb/encoding" "github.com/grafana/tempo/tempodb/encoding/common" - v2 "github.com/grafana/tempo/tempodb/encoding/v2" "github.com/grafana/tempo/tempodb/wal" ) @@ -38,7 +37,6 @@ func TestRetention(t *testing.T) { }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), - Version: v2.VersionString, }, BlocklistPoll: 0, }, log.NewNopLogger()) @@ -100,7 +98,6 @@ func TestRetentionUpdatesBlocklistImmediately(t *testing.T) { }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), - Version: v2.VersionString, }, BlocklistPoll: 0, }, log.NewNopLogger()) @@ -168,7 +165,6 @@ func TestBlockRetentionOverride(t *testing.T) { }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), - Version: v2.VersionString, }, BlocklistPoll: 0, }, log.NewNopLogger()) diff --git a/tempodb/tempodb.go b/tempodb/tempodb.go index 361f46849bc..89ac2c52059 100644 --- a/tempodb/tempodb.go +++ b/tempodb/tempodb.go @@ -218,6 +218,12 @@ func (rw *readerWriter) CompleteBlockWithBackend(ctx context.Context, block comm return nil, err } + // force flush anything left in the wal + err = block.Flush() + if err != nil { + return nil, fmt.Errorf("error flushing wal block: %w", err) + } + iter, err := block.Iterator() if err != nil { return nil, err diff --git a/tempodb/tempodb_search_test.go b/tempodb/tempodb_search_test.go index 5b0ddfa2dfb..4357bb6d20c 100644 --- a/tempodb/tempodb_search_test.go +++ b/tempodb/tempodb_search_test.go @@ -44,7 +44,6 @@ func testSearchCompleteBlock(t *testing.T, blockVersion string) { }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), - Version: v2.VersionString, IngestionSlack: time.Since(time.Time{}), }, Search: &SearchConfig{ diff --git a/tempodb/tempodb_test.go b/tempodb/tempodb_test.go index 0a4cc03af86..e9d5138b6c0 100644 --- a/tempodb/tempodb_test.go +++ b/tempodb/tempodb_test.go @@ -53,7 +53,6 @@ func testConfig(t *testing.T, enc backend.Encoding, blocklistPoll time.Duration, }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), - Version: v2.VersionString, }, BlocklistPoll: blocklistPoll, } @@ -214,7 +213,7 @@ func checkBlocklists(t *testing.T, expectedID uuid.UUID, expectedB int, expected rw.pollBlocklist() blocklist := rw.blocklist.Metas(testTenantID) - assert.Len(t, blocklist, expectedB) + require.Len(t, blocklist, expectedB) if expectedB > 0 && expectedID != uuid.Nil { assert.Equal(t, expectedID, blocklist[0].BlockID) } @@ -503,7 +502,7 @@ func TestIncludeCompactedBlock(t *testing.T) { } func TestSearchCompactedBlocks(t *testing.T) { - r, w, c, _ := testConfig(t, backend.EncLZ4_256k, time.Minute) + r, w, c, _ := testConfig(t, backend.EncLZ4_256k, time.Hour) c.EnableCompaction(&CompactorConfig{ ChunkSizeBytes: 10, @@ -587,18 +586,19 @@ func TestCompleteBlock(t *testing.T) { } func testCompleteBlock(t *testing.T, from, to string) { - _, w, _, _ := testConfig(t, backend.EncLZ4_256k, time.Minute, func(c *Config) { - c.WAL.Version = from - c.Block.Version = to + c.Block.Version = from // temporarily set config to from while we create the wal, so it makes blocks in the "from" format }) wal := w.WAL() + rw := w.(*readerWriter) + rw.cfg.Block.Version = to // now set it back so we cut blocks in the "to" format blockID := uuid.New() block, err := wal.NewBlock(blockID, testTenantID, model.CurrentEncoding) require.NoError(t, err, "unexpected error creating block") + require.Equal(t, block.BlockMeta().Version, from) dec := model.MustNewSegmentDecoder(model.CurrentEncoding) @@ -617,6 +617,7 @@ func testCompleteBlock(t *testing.T, from, to string) { complete, err := w.CompleteBlock(context.Background(), block) require.NoError(t, err, "unexpected error completing block") + require.Equal(t, complete.BlockMeta().Version, to) for i, id := range ids { found, err := complete.FindTraceByID(context.TODO(), id, common.DefaultSearchOptions()) @@ -656,7 +657,6 @@ func testCompleteBlockHonorsStartStopTimes(t *testing.T, targetBlockVersion stri WAL: &wal.Config{ IngestionSlack: time.Minute, Filepath: path.Join(tempDir, "wal"), - Version: v2.VersionString, }, BlocklistPoll: 0, }, log.NewNopLogger()) @@ -704,7 +704,6 @@ func TestShouldCache(t *testing.T) { }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), - Version: v2.VersionString, }, BlocklistPoll: 0, CacheMaxBlockAge: time.Hour, @@ -767,16 +766,14 @@ func writeTraceToWal(t require.TestingT, b common.WALBlock, dec model.SegmentDec func BenchmarkCompleteBlock(b *testing.B) { enc := encoding.AllEncodings() - for _, from := range enc { - for _, to := range enc { - b.Run(fmt.Sprintf("%s->%s", from.Version(), to.Version()), func(b *testing.B) { - benchmarkCompleteBlock(b, from, to) - }) - } + for _, e := range enc { + b.Run(e.Version(), func(b *testing.B) { + benchmarkCompleteBlock(b, e) + }) } } -func benchmarkCompleteBlock(b *testing.B, from, to encoding.VersionedEncoding) { +func benchmarkCompleteBlock(b *testing.B, e encoding.VersionedEncoding) { // Create a WAL block with traces traceCount := 10_000 flushCount := 1000 @@ -793,13 +790,12 @@ func benchmarkCompleteBlock(b *testing.B, from, to encoding.VersionedEncoding) { BloomShardSizeBytes: 100_000, Encoding: backend.EncNone, IndexPageSizeBytes: 1000, - Version: to.Version(), + Version: e.Version(), RowGroupSizeBytes: 30_000_000, }, WAL: &wal.Config{ IngestionSlack: time.Minute, Filepath: path.Join(tempDir, "wal"), - Version: from.Version(), }, BlocklistPoll: 0, }, log.NewNopLogger()) diff --git a/tempodb/wal/wal.go b/tempodb/wal/wal.go index 3e38df2386d..6c99d3a8759 100644 --- a/tempodb/wal/wal.go +++ b/tempodb/wal/wal.go @@ -30,15 +30,15 @@ type Config struct { Filepath string `yaml:"path"` CompletedFilepath string BlocksFilepath string - Encoding backend.Encoding `yaml:"encoding"` + Encoding backend.Encoding `yaml:"v2_encoding"` SearchEncoding backend.Encoding `yaml:"search_encoding"` - Version string `yaml:"version"` IngestionSlack time.Duration `yaml:"ingestion_time_range_slack"` + Version string `yaml:"version,omitempty"` } -func ValidateConfig(b *Config) error { - if _, err := encoding.FromVersion(b.Version); err != nil { - return err +func ValidateConfig(c *Config) error { + if _, err := encoding.FromVersion(c.Version); err != nil { + return fmt.Errorf("failed to validate block version %s: %w", c.Version, err) } return nil diff --git a/tempodb/wal/wal_test.go b/tempodb/wal/wal_test.go index fb95673e5d9..ee811ca0072 100644 --- a/tempodb/wal/wal_test.go +++ b/tempodb/wal/wal_test.go @@ -45,6 +45,7 @@ func TestCompletedDirIsRemoved(t *testing.T) { _, err = New(&Config{ Filepath: tempDir, + Version: encoding.DefaultEncoding().Version(), }) require.NoError(t, err, "unexpected error creating temp wal") @@ -69,6 +70,7 @@ func testAppendBlockStartEnd(t *testing.T, e encoding.VersionedEncoding) { Filepath: t.TempDir(), Encoding: backend.EncNone, IngestionSlack: 3 * time.Minute, + Version: encoding.DefaultEncoding().Version(), }) require.NoError(t, err, "unexpected error creating temp wal") @@ -129,6 +131,7 @@ func testIngestionSlack(t *testing.T, e encoding.VersionedEncoding) { Filepath: t.TempDir(), Encoding: backend.EncNone, IngestionSlack: time.Minute, + Version: encoding.DefaultEncoding().Version(), }) require.NoError(t, err, "unexpected error creating temp wal") @@ -319,6 +322,7 @@ func TestInvalidFilesAndFoldersAreHandled(t *testing.T) { wal, err := New(&Config{ Filepath: tempDir, Encoding: backend.EncGZIP, + Version: encoding.DefaultEncoding().Version(), }) require.NoError(t, err, "unexpected error creating temp wal") @@ -363,16 +367,17 @@ func TestInvalidFilesAndFoldersAreHandled(t *testing.T) { require.DirExists(t, filepath.Join(tempDir, "fe0b83eb-a86b-4b6c-9a74-dc272cd5700e+tenant+vOther")) } -func runWALTest(t testing.TB, dbEncoding string, runner func([][]byte, []*tempopb.Trace, common.WALBlock)) { +func runWALTest(t testing.TB, encoding string, runner func([][]byte, []*tempopb.Trace, common.WALBlock)) { wal, err := New(&Config{ Filepath: t.TempDir(), Encoding: backend.EncNone, + Version: encoding, }) require.NoError(t, err, "unexpected error creating temp wal") blockID := uuid.New() - block, err := wal.newBlock(blockID, testTenantID, model.CurrentEncoding, dbEncoding) + block, err := wal.newBlock(blockID, testTenantID, model.CurrentEncoding, encoding) require.NoError(t, err, "unexpected error creating block") enc := model.MustNewSegmentDecoder(model.CurrentEncoding) @@ -511,6 +516,7 @@ func runWALBenchmark(b *testing.B, encoding string, flushCount int, runner func( wal, err := New(&Config{ Filepath: b.TempDir(), Encoding: backend.EncNone, + Version: encoding, }) require.NoError(b, err, "unexpected error creating temp wal")