Skip to content

Commit ab7af05

Browse files
authored
refactor(blooms): Builder retrieves tasks from planner (#13046)
1 parent cc3694e commit ab7af05

File tree

8 files changed

+392
-32
lines changed

8 files changed

+392
-32
lines changed

docs/sources/shared/configuration.md

+10
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,15 @@ bloom_build:
356356
[max_queued_tasks_per_tenant: <int> | default = 30000]
357357

358358
builder:
359+
# The grpc_client block configures the gRPC client used to communicate
360+
# between a client and server component in Loki.
361+
# The CLI flags prefix for this block configuration is:
362+
# bloom-build.builder.grpc
363+
[grpc_config: <grpc_client>]
364+
365+
# Hostname (and port) of the bloom planner
366+
# CLI flag: -bloom-build.builder.planner-address
367+
[planner_address: <string> | default = ""]
359368

360369
# Experimental: The bloom_gateway block configures the Loki bloom gateway
361370
# server, responsible for serving queries for filtering chunks based on filter
@@ -2335,6 +2344,7 @@ The `gcs_storage_config` block configures the connection to Google Cloud Storage
23352344
The `grpc_client` block configures the gRPC client used to communicate between a client and server component in Loki. The supported CLI flags `<prefix>` used to reference this configuration block are:
23362345

23372346
- `bigtable`
2347+
- `bloom-build.builder.grpc`
23382348
- `bloom-gateway-client.grpc`
23392349
- `boltdb.shipper.index-gateway-client.grpc`
23402350
- `frontend.grpc-client-config`

pkg/bloombuild/builder/builder.go

+126-11
Original file line numberDiff line numberDiff line change
@@ -2,49 +2,164 @@ package builder
22

33
import (
44
"context"
5+
"fmt"
6+
"time"
57

68
"github.com/go-kit/log"
9+
"github.com/go-kit/log/level"
10+
"github.com/google/uuid"
711
"github.com/grafana/dskit/services"
12+
"github.com/pkg/errors"
813
"github.com/prometheus/client_golang/prometheus"
14+
"google.golang.org/grpc"
915

16+
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
1017
utillog "github.com/grafana/loki/v3/pkg/util/log"
1118
)
1219

13-
type Worker struct {
20+
type Builder struct {
1421
services.Service
1522

23+
ID string
24+
1625
cfg Config
1726
metrics *Metrics
1827
logger log.Logger
28+
29+
client protos.PlannerForBuilderClient
1930
}
2031

2132
func New(
2233
cfg Config,
2334
logger log.Logger,
2435
r prometheus.Registerer,
25-
) (*Worker, error) {
36+
) (*Builder, error) {
2637
utillog.WarnExperimentalUse("Bloom Builder", logger)
2738

28-
w := &Worker{
39+
b := &Builder{
40+
ID: uuid.NewString(),
2941
cfg: cfg,
3042
metrics: NewMetrics(r),
3143
logger: logger,
3244
}
3345

34-
w.Service = services.NewBasicService(w.starting, w.running, w.stopping)
35-
return w, nil
46+
b.Service = services.NewBasicService(b.starting, b.running, b.stopping)
47+
return b, nil
48+
}
49+
50+
func (b *Builder) starting(_ context.Context) error {
51+
b.metrics.running.Set(1)
52+
return nil
3653
}
3754

38-
func (w *Worker) starting(_ context.Context) (err error) {
39-
w.metrics.running.Set(1)
40-
return err
55+
func (b *Builder) stopping(_ error) error {
56+
if b.client != nil {
57+
req := &protos.NotifyBuilderShutdownRequest{
58+
BuilderID: b.ID,
59+
}
60+
if _, err := b.client.NotifyBuilderShutdown(context.Background(), req); err != nil {
61+
level.Error(b.logger).Log("msg", "failed to notify planner about builder shutdown", "err", err)
62+
}
63+
}
64+
65+
b.metrics.running.Set(0)
66+
return nil
4167
}
4268

43-
func (w *Worker) stopping(_ error) error {
44-
w.metrics.running.Set(0)
69+
func (b *Builder) running(ctx context.Context) error {
70+
opts, err := b.cfg.GrpcConfig.DialOption(nil, nil)
71+
if err != nil {
72+
return fmt.Errorf("failed to create grpc dial options: %w", err)
73+
}
74+
75+
// TODO: Wrap hereafter in retry logic
76+
conn, err := grpc.DialContext(ctx, b.cfg.PlannerAddress, opts...)
77+
if err != nil {
78+
return fmt.Errorf("failed to dial bloom planner: %w", err)
79+
}
80+
81+
b.client = protos.NewPlannerForBuilderClient(conn)
82+
83+
c, err := b.client.BuilderLoop(ctx)
84+
if err != nil {
85+
return fmt.Errorf("failed to start builder loop: %w", err)
86+
}
87+
88+
// Start processing tasks from planner
89+
if err := b.builderLoop(c); err != nil {
90+
return fmt.Errorf("builder loop failed: %w", err)
91+
}
92+
4593
return nil
4694
}
4795

48-
func (w *Worker) running(_ context.Context) error {
96+
func (b *Builder) builderLoop(c protos.PlannerForBuilder_BuilderLoopClient) error {
97+
// Send ready message to planner
98+
if err := c.Send(&protos.BuilderToPlanner{BuilderID: b.ID}); err != nil {
99+
return fmt.Errorf("failed to send ready message to planner: %w", err)
100+
}
101+
102+
for b.State() == services.Running {
103+
// When the planner connection closes or the builder stops, the context
104+
// will be canceled and the loop will exit.
105+
protoTask, err := c.Recv()
106+
if err != nil {
107+
if errors.Is(c.Context().Err(), context.Canceled) {
108+
level.Debug(b.logger).Log("msg", "builder loop context canceled")
109+
return nil
110+
}
111+
112+
return fmt.Errorf("failed to receive task from planner: %w", err)
113+
}
114+
115+
b.metrics.taskStarted.Inc()
116+
start := time.Now()
117+
status := statusSuccess
118+
119+
err = b.processTask(c.Context(), protoTask.Task)
120+
if err != nil {
121+
status = statusFailure
122+
level.Error(b.logger).Log("msg", "failed to process task", "err", err)
123+
}
124+
125+
b.metrics.taskCompleted.WithLabelValues(status).Inc()
126+
b.metrics.taskDuration.WithLabelValues(status).Observe(time.Since(start).Seconds())
127+
128+
// Acknowledge task completion to planner
129+
if err = b.notifyTaskCompletedToPlanner(c, err); err != nil {
130+
return fmt.Errorf("failed to notify task completion to planner: %w", err)
131+
}
132+
}
133+
134+
level.Debug(b.logger).Log("msg", "builder loop stopped")
135+
return nil
136+
}
137+
138+
func (b *Builder) notifyTaskCompletedToPlanner(c protos.PlannerForBuilder_BuilderLoopClient, err error) error {
139+
var errMsg string
140+
if err != nil {
141+
errMsg = err.Error()
142+
}
143+
144+
// TODO: Implement retry
145+
if err := c.Send(&protos.BuilderToPlanner{
146+
BuilderID: b.ID,
147+
Error: errMsg,
148+
}); err != nil {
149+
return fmt.Errorf("failed to acknowledge task completion to planner: %w", err)
150+
}
151+
return nil
152+
}
153+
154+
func (b *Builder) processTask(_ context.Context, protoTask *protos.ProtoTask) error {
155+
task, err := protos.FromProtoTask(protoTask)
156+
if err != nil {
157+
return fmt.Errorf("failed to convert proto task to task: %w", err)
158+
}
159+
160+
level.Debug(b.logger).Log("msg", "received task", "task", task.ID)
161+
162+
// TODO: Implement task processing
163+
49164
return nil
50165
}
+123
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package builder
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net"
7+
"testing"
8+
"time"
9+
10+
"github.com/go-kit/log"
11+
"github.com/grafana/dskit/flagext"
12+
"github.com/grafana/dskit/services"
13+
"github.com/prometheus/client_golang/prometheus"
14+
"github.com/stretchr/testify/require"
15+
"google.golang.org/grpc"
16+
17+
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
18+
)
19+
20+
func Test_BuilderLoop(t *testing.T) {
21+
logger := log.NewNopLogger()
22+
23+
tasks := make([]*protos.ProtoTask, 256)
24+
for i := range tasks {
25+
tasks[i] = &protos.ProtoTask{
26+
Id: fmt.Sprintf("task-%d", i),
27+
}
28+
}
29+
30+
server, err := newFakePlannerServer(tasks)
31+
require.NoError(t, err)
32+
33+
cfg := Config{
34+
PlannerAddress: server.Addr(),
35+
}
36+
flagext.DefaultValues(&cfg.GrpcConfig)
37+
38+
builder, err := New(cfg, logger, prometheus.DefaultRegisterer)
39+
require.NoError(t, err)
40+
t.Cleanup(func() {
41+
err = services.StopAndAwaitTerminated(context.Background(), builder)
42+
require.NoError(t, err)
43+
44+
server.Stop()
45+
})
46+
47+
err = services.StartAndAwaitRunning(context.Background(), builder)
48+
require.NoError(t, err)
49+
50+
require.Eventually(t, func() bool {
51+
return server.completedTasks == len(tasks)
52+
}, 5*time.Second, 100*time.Millisecond)
53+
54+
err = services.StopAndAwaitTerminated(context.Background(), builder)
55+
require.NoError(t, err)
56+
57+
require.True(t, server.shutdownCalled)
58+
}
59+
60+
type fakePlannerServer struct {
61+
tasks []*protos.ProtoTask
62+
completedTasks int
63+
shutdownCalled bool
64+
65+
addr string
66+
grpcServer *grpc.Server
67+
}
68+
69+
func newFakePlannerServer(tasks []*protos.ProtoTask) (*fakePlannerServer, error) {
70+
lis, err := net.Listen("tcp", "localhost:0")
71+
if err != nil {
72+
return nil, err
73+
}
74+
75+
server := &fakePlannerServer{
76+
tasks: tasks,
77+
addr: lis.Addr().String(),
78+
grpcServer: grpc.NewServer(),
79+
}
80+
81+
protos.RegisterPlannerForBuilderServer(server.grpcServer, server)
82+
go func() {
83+
if err := server.grpcServer.Serve(lis); err != nil {
84+
panic(err)
85+
}
86+
}()
87+
88+
return server, nil
89+
}
90+
91+
func (f *fakePlannerServer) Addr() string {
92+
return f.addr
93+
}
94+
95+
func (f *fakePlannerServer) Stop() {
96+
f.grpcServer.Stop()
97+
}
98+
99+
func (f *fakePlannerServer) BuilderLoop(srv protos.PlannerForBuilder_BuilderLoopServer) error {
100+
// Receive Ready
101+
if _, err := srv.Recv(); err != nil {
102+
return fmt.Errorf("failed to receive ready: %w", err)
103+
}
104+
105+
for _, task := range f.tasks {
106+
if err := srv.Send(&protos.PlannerToBuilder{Task: task}); err != nil {
107+
return fmt.Errorf("failed to send task: %w", err)
108+
}
109+
if _, err := srv.Recv(); err != nil {
110+
return fmt.Errorf("failed to receive task response: %w", err)
111+
}
112+
f.completedTasks++
113+
}
114+
115+
// No more tasks. Wait until shutdown.
116+
<-srv.Context().Done()
117+
return nil
118+
}
119+
120+
func (f *fakePlannerServer) NotifyBuilderShutdown(_ context.Context, _ *protos.NotifyBuilderShutdownRequest) (*protos.NotifyBuilderShutdownResponse, error) {
121+
f.shutdownCalled = true
122+
return &protos.NotifyBuilderShutdownResponse{}, nil
123+
}

pkg/bloombuild/builder/config.go

+19-4
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,33 @@
11
package builder
22

3-
import "flag"
3+
import (
4+
"flag"
5+
"fmt"
6+
7+
"github.com/grafana/dskit/grpcclient"
8+
)
49

510
// Config configures the bloom-builder component.
611
type Config struct {
7-
// TODO: Add config
12+
GrpcConfig grpcclient.Config `yaml:"grpc_config"`
13+
PlannerAddress string `yaml:"planner_address"`
814
}
915

1016
// RegisterFlagsWithPrefix registers flags for the bloom-planner configuration.
11-
func (cfg *Config) RegisterFlagsWithPrefix(_ string, _ *flag.FlagSet) {
12-
// TODO: Register flags with flagsPrefix
17+
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
18+
f.StringVar(&cfg.PlannerAddress, prefix+".planner-address", "", "Hostname (and port) of the bloom planner")
19+
cfg.GrpcConfig.RegisterFlagsWithPrefix(prefix+".grpc", f)
1320
}
1421

1522
func (cfg *Config) Validate() error {
23+
if cfg.PlannerAddress == "" {
24+
return fmt.Errorf("planner address is required")
25+
}
26+
27+
if err := cfg.GrpcConfig.Validate(); err != nil {
28+
return fmt.Errorf("grpc config is invalid: %w", err)
29+
}
30+
1631
return nil
1732
}
1833

0 commit comments

Comments
 (0)