-
Notifications
You must be signed in to change notification settings - Fork 548
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
block-builder: Add an option to consume a partition completely till cycle end offset with no partially region #10506
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -78,7 +78,7 @@ func NewTSDBBuilder(logger log.Logger, dataDir string, blocksStorageCfg mimir_ts | |
// lastBlockMax: max time of the block in the previous block building cycle. | ||
// blockMax: max time of the block in the current block building cycle. This blockMax is exclusive of the last sample by design in TSDB. | ||
// recordAlreadyProcessed: true if the record was processed in the previous cycle. (It gets processed again if some samples did not fit in the previous cycle.) | ||
func (b *TSDBBuilder) Process(ctx context.Context, rec *kgo.Record, lastBlockMax, blockMax int64, recordAlreadyProcessed bool) (_ bool, err error) { | ||
func (b *TSDBBuilder) Process(ctx context.Context, rec *kgo.Record, lastBlockMax, blockMax int64, recordAlreadyProcessed, processEverything bool) (_ bool, err error) { | ||
userID := string(rec.Key) | ||
|
||
req := mimirpb.PreallocWriteRequest{ | ||
|
@@ -133,12 +133,12 @@ func (b *TSDBBuilder) Process(ctx context.Context, rec *kgo.Record, lastBlockMax | |
ref, copiedLabels := app.GetRef(nonCopiedLabels, hash) | ||
|
||
for _, s := range ts.Samples { | ||
if s.TimestampMs >= blockMax { | ||
if !processEverything && s.TimestampMs >= blockMax { | ||
// We will process this sample in the next cycle. | ||
allSamplesProcessed = false | ||
continue | ||
} | ||
if recordAlreadyProcessed && s.TimestampMs < lastBlockMax { | ||
if !processEverything && recordAlreadyProcessed && s.TimestampMs < lastBlockMax { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As I understand it the first time we restart blockbuilders with this flag enabled, we will produce a small amount of duplicate records. (records that had There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, migrating from old the new way will create some duplicate samples in blocks which is harmless since compactor will merge them together. |
||
// This sample was already processed in the previous cycle. | ||
continue | ||
} | ||
|
@@ -170,12 +170,12 @@ func (b *TSDBBuilder) Process(ctx context.Context, rec *kgo.Record, lastBlockMax | |
} | ||
|
||
for _, h := range ts.Histograms { | ||
if h.Timestamp >= blockMax { | ||
if !processEverything && h.Timestamp >= blockMax { | ||
// We will process this sample in the next cycle. | ||
allSamplesProcessed = false | ||
continue | ||
} | ||
if recordAlreadyProcessed && h.Timestamp < lastBlockMax { | ||
if !processEverything && recordAlreadyProcessed && h.Timestamp < lastBlockMax { | ||
// This sample was already processed in the previous cycle. | ||
continue | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, you had mentioned this, but I didn't quite see the blatant duplication. 👍