Skip to content

Commit

Permalink
fix: queue stat should look for alldone marker and exit
Browse files Browse the repository at this point in the history
Signed-off-by: Nick Mitchell <[email protected]>
  • Loading branch information
starpit committed Nov 1, 2024
1 parent b26db2d commit 8932bd2
Showing 1 changed file with 10 additions and 0 deletions.
10 changes: 10 additions & 0 deletions pkg/observe/queuestreamer/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ func once(ctx context.Context, c client, modelChan chan Model, doneChan chan str
fmt.Fprintf(os.Stderr, "Listen bucket=%s path=%s\n", c.RunContext.Bucket, prefix)
}

// We'll keep an eye out for this marker. If we see it, the run is done.
allDoneMarker := c.RunContext.AsFile(queue.AllDoneMarker)

objects, errs := c.s3.Listen(c.RunContext.Bucket, prefix, "", true)
defer c.s3.StopListening(c.RunContext.Bucket)

Expand Down Expand Up @@ -86,6 +89,13 @@ func once(ctx context.Context, c client, modelChan chan Model, doneChan chan str
if c.LogOptions.Verbose {
fmt.Fprintf(os.Stderr, "Queue streamer got push notification object=%s\n", obj)
}

if obj == allDoneMarker {
if c.LogOptions.Verbose {
fmt.Fprintln(os.Stderr, "Queue streamer got all done")
}
return nil
}
}

// fetch and parse model
Expand Down

0 comments on commit 8932bd2

Please sign in to comment.