Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(blooms): Builder retrieves tasks from planner #13046

Merged
merged 12 commits into from
May 29, 2024
Merged
10 changes: 10 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,15 @@ bloom_build:
[max_queued_tasks_per_tenant: <int> | default = 30000]

builder:
# The grpc_client block configures the gRPC client used to communicate
# between a client and server component in Loki.
# The CLI flags prefix for this block configuration is:
# bloom-build.builder.grpc
[grpc_config: <grpc_client>]

# Hostname (and port) of the bloom planner
# CLI flag: -bloom-build.builder.planner-address
[planner_address: <string> | default = ""]

# Experimental: The bloom_gateway block configures the Loki bloom gateway
# server, responsible for serving queries for filtering chunks based on filter
Expand Down Expand Up @@ -2335,6 +2344,7 @@ The `gcs_storage_config` block configures the connection to Google Cloud Storage
The `grpc_client` block configures the gRPC client used to communicate between a client and server component in Loki. The supported CLI flags `<prefix>` used to reference this configuration block are:

- `bigtable`
- `bloom-build.builder.grpc`
- `bloom-gateway-client.grpc`
- `boltdb.shipper.index-gateway-client.grpc`
- `frontend.grpc-client-config`
Expand Down
137 changes: 126 additions & 11 deletions pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,49 +2,164 @@ package builder

import (
"context"
"fmt"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/google/uuid"
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"

"github.com/grafana/loki/v3/pkg/bloombuild/protos"
utillog "github.com/grafana/loki/v3/pkg/util/log"
)

type Worker struct {
type Builder struct {
services.Service

ID string

cfg Config
metrics *Metrics
logger log.Logger

client protos.PlannerForBuilderClient
}

func New(
cfg Config,
logger log.Logger,
r prometheus.Registerer,
) (*Worker, error) {
) (*Builder, error) {
utillog.WarnExperimentalUse("Bloom Builder", logger)

w := &Worker{
b := &Builder{
ID: uuid.NewString(),
cfg: cfg,
metrics: NewMetrics(r),
logger: logger,
}

w.Service = services.NewBasicService(w.starting, w.running, w.stopping)
return w, nil
b.Service = services.NewBasicService(b.starting, b.running, b.stopping)
return b, nil
}

func (b *Builder) starting(_ context.Context) error {
b.metrics.running.Set(1)
return nil
}

func (w *Worker) starting(_ context.Context) (err error) {
w.metrics.running.Set(1)
return err
func (b *Builder) stopping(_ error) error {
if b.client != nil {
req := &protos.NotifyBuilderShutdownRequest{
BuilderID: b.ID,
}
if _, err := b.client.NotifyBuilderShutdown(context.Background(), req); err != nil {
level.Error(b.logger).Log("msg", "failed to notify planner about builder shutdown", "err", err)
}
}

b.metrics.running.Set(0)
return nil
}

func (w *Worker) stopping(_ error) error {
w.metrics.running.Set(0)
func (b *Builder) running(ctx context.Context) error {
opts, err := b.cfg.GrpcConfig.DialOption(nil, nil)
if err != nil {
return fmt.Errorf("failed to create grpc dial options: %w", err)
}

// TODO: Wrap hereafter in retry logic
conn, err := grpc.DialContext(ctx, b.cfg.PlannerAddress, opts...)
if err != nil {
return fmt.Errorf("failed to dial bloom planner: %w", err)
}

b.client = protos.NewPlannerForBuilderClient(conn)

c, err := b.client.BuilderLoop(ctx)
if err != nil {
return fmt.Errorf("failed to start builder loop: %w", err)
}

// Start processing tasks from planner
if err := b.builderLoop(c); err != nil {
return fmt.Errorf("builder loop failed: %w", err)
}

return nil
}

func (w *Worker) running(_ context.Context) error {
func (b *Builder) builderLoop(c protos.PlannerForBuilder_BuilderLoopClient) error {
// Send ready message to planner
if err := c.Send(&protos.BuilderToPlanner{BuilderID: b.ID}); err != nil {
return fmt.Errorf("failed to send ready message to planner: %w", err)
}

for b.State() == services.Running {
// When the planner connection closes or the builder stops, the context
// will be canceled and the loop will exit.
protoTask, err := c.Recv()
if err != nil {
if errors.Is(c.Context().Err(), context.Canceled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should check err for context.Canceled, or, check if c.Context() == nil before c.Recv()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the connection context is canceled, c.Recv() will return right after calling it.
(I actually had it as you suggest but noticed the behaviour while testing and decided to remove it for simplicity)

level.Debug(b.logger).Log("msg", "builder loop context canceled")
return nil
}

return fmt.Errorf("failed to receive task from planner: %w", err)
}

b.metrics.taskStarted.Inc()
start := time.Now()
status := statusSuccess

err = b.processTask(c.Context(), protoTask.Task)
if err != nil {
status = statusFailure
level.Error(b.logger).Log("msg", "failed to process task", "err", err)
}

b.metrics.taskCompleted.WithLabelValues(status).Inc()
b.metrics.taskDuration.WithLabelValues(status).Observe(time.Since(start).Seconds())

// Acknowledge task completion to planner
if err = b.notifyTaskCompletedToPlanner(c, err); err != nil {
return fmt.Errorf("failed to notify task completion to planner: %w", err)
}
}

level.Debug(b.logger).Log("msg", "builder loop stopped")
return nil
}

func (b *Builder) notifyTaskCompletedToPlanner(c protos.PlannerForBuilder_BuilderLoopClient, err error) error {
var errMsg string
if err != nil {
errMsg = err.Error()
}

// TODO: Implement retry
if err := c.Send(&protos.BuilderToPlanner{
BuilderID: b.ID,
Error: errMsg,
}); err != nil {
return fmt.Errorf("failed to acknowledge task completion to planner: %w", err)
}
return nil
}

func (b *Builder) processTask(_ context.Context, protoTask *protos.ProtoTask) error {
task, err := protos.FromProtoTask(protoTask)
if err != nil {
return fmt.Errorf("failed to convert proto task to task: %w", err)
}

level.Debug(b.logger).Log("msg", "received task", "task", task.ID)

// TODO: Implement task processing

return nil
}
123 changes: 123 additions & 0 deletions pkg/bloombuild/builder/builder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package builder

import (
"context"
"fmt"
"net"
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"

"github.com/grafana/loki/v3/pkg/bloombuild/protos"
)

func Test_BuilderLoop(t *testing.T) {
logger := log.NewNopLogger()

tasks := make([]*protos.ProtoTask, 256)
for i := range tasks {
tasks[i] = &protos.ProtoTask{
Id: fmt.Sprintf("task-%d", i),
}
}

server, err := newFakePlannerServer(tasks)
require.NoError(t, err)

cfg := Config{
PlannerAddress: server.Addr(),
}
flagext.DefaultValues(&cfg.GrpcConfig)

builder, err := New(cfg, logger, prometheus.DefaultRegisterer)
require.NoError(t, err)
t.Cleanup(func() {
err = services.StopAndAwaitTerminated(context.Background(), builder)
require.NoError(t, err)

server.Stop()
})

err = services.StartAndAwaitRunning(context.Background(), builder)
require.NoError(t, err)

require.Eventually(t, func() bool {
return server.completedTasks == len(tasks)
}, 5*time.Second, 100*time.Millisecond)

err = services.StopAndAwaitTerminated(context.Background(), builder)
require.NoError(t, err)

require.True(t, server.shutdownCalled)
}

type fakePlannerServer struct {
tasks []*protos.ProtoTask
completedTasks int
shutdownCalled bool

addr string
grpcServer *grpc.Server
}

func newFakePlannerServer(tasks []*protos.ProtoTask) (*fakePlannerServer, error) {
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
return nil, err
}

server := &fakePlannerServer{
tasks: tasks,
addr: lis.Addr().String(),
grpcServer: grpc.NewServer(),
}

protos.RegisterPlannerForBuilderServer(server.grpcServer, server)
go func() {
if err := server.grpcServer.Serve(lis); err != nil {
panic(err)
}
}()

return server, nil
}

func (f *fakePlannerServer) Addr() string {
return f.addr
}

func (f *fakePlannerServer) Stop() {
f.grpcServer.Stop()
}

func (f *fakePlannerServer) BuilderLoop(srv protos.PlannerForBuilder_BuilderLoopServer) error {
// Receive Ready
if _, err := srv.Recv(); err != nil {
return fmt.Errorf("failed to receive ready: %w", err)
}

for _, task := range f.tasks {
if err := srv.Send(&protos.PlannerToBuilder{Task: task}); err != nil {
return fmt.Errorf("failed to send task: %w", err)
}
if _, err := srv.Recv(); err != nil {
return fmt.Errorf("failed to receive task response: %w", err)
}
f.completedTasks++
}

// No more tasks. Wait until shutdown.
<-srv.Context().Done()
return nil
}

func (f *fakePlannerServer) NotifyBuilderShutdown(_ context.Context, _ *protos.NotifyBuilderShutdownRequest) (*protos.NotifyBuilderShutdownResponse, error) {
f.shutdownCalled = true
return &protos.NotifyBuilderShutdownResponse{}, nil
}
23 changes: 19 additions & 4 deletions pkg/bloombuild/builder/config.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,33 @@
package builder

import "flag"
import (
"flag"
"fmt"

"github.com/grafana/dskit/grpcclient"
)

// Config configures the bloom-builder component.
type Config struct {
// TODO: Add config
GrpcConfig grpcclient.Config `yaml:"grpc_config"`
PlannerAddress string `yaml:"planner_address"`
}

// RegisterFlagsWithPrefix registers flags for the bloom-planner configuration.
func (cfg *Config) RegisterFlagsWithPrefix(_ string, _ *flag.FlagSet) {
// TODO: Register flags with flagsPrefix
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.PlannerAddress, prefix+".planner-address", "", "Hostname (and port) of the bloom planner")
cfg.GrpcConfig.RegisterFlagsWithPrefix(prefix+".grpc", f)
}

func (cfg *Config) Validate() error {
if cfg.PlannerAddress == "" {
return fmt.Errorf("planner address is required")
}

if err := cfg.GrpcConfig.Validate(); err != nil {
return fmt.Errorf("grpc config is invalid: %w", err)
}

return nil
}

Expand Down
Loading
Loading