Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log retried errors #2613

Merged
merged 2 commits into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 57 additions & 41 deletions docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,20 +889,25 @@ func (p *DockerProvider) BuildImage(ctx context.Context, img ImageBuildInfo) (st

var buildError error
var resp types.ImageBuildResponse
err = backoff.Retry(func() error {
resp, err = p.client.ImageBuild(ctx, buildOptions.Context, buildOptions)
if err != nil {
buildError = errors.Join(buildError, err)
if isPermanentClientError(err) {
return backoff.Permanent(err)
err = backoff.RetryNotify(
func() error {
resp, err = p.client.ImageBuild(ctx, buildOptions.Context, buildOptions)
if err != nil {
buildError = errors.Join(buildError, err)
if isPermanentClientError(err) {
return backoff.Permanent(err)
}
return err
}
Logger.Printf("Failed to build image: %s, will retry", err)
return err
}
defer p.Close()

return nil
}, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
defer p.Close()

return nil
},
backoff.WithContext(backoff.NewExponentialBackOff(), ctx),
func(err error, duration time.Duration) {
p.Logger.Printf("Failed to build image: %s, will retry", err)
},
)
if err != nil {
return "", errors.Join(buildError, err)
}
Expand Down Expand Up @@ -1015,7 +1020,7 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque
}

if modifiedTag != imageName {
Logger.Printf("✍🏼 Replacing image with %s. From: %s to %s\n", is.Description(), imageName, modifiedTag)
p.Logger.Printf("✍🏼 Replacing image with %s. From: %s to %s\n", is.Description(), imageName, modifiedTag)
imageName = modifiedTag
}
}
Expand Down Expand Up @@ -1180,23 +1185,29 @@ func (p *DockerProvider) findContainerByName(ctx context.Context, name string) (
}

func (p *DockerProvider) waitContainerCreation(ctx context.Context, name string) (*types.Container, error) {
var ctr *types.Container
return ctr, backoff.Retry(func() error {
c, err := p.findContainerByName(ctx, name)
if err != nil {
if !errdefs.IsNotFound(err) && isPermanentClientError(err) {
return backoff.Permanent(err)
return backoff.RetryNotifyWithData(
func() (*types.Container, error) {
c, err := p.findContainerByName(ctx, name)
if err != nil {
if !errdefs.IsNotFound(err) && isPermanentClientError(err) {
return nil, backoff.Permanent(err)
}
return nil, err
}
return err
}

if c == nil {
return fmt.Errorf("container %s not found", name)
}

ctr = c
return nil
}, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
if c == nil {
return nil, errdefs.NotFound(fmt.Errorf("container %s not found", name))
}
return c, nil
},
backoff.WithContext(backoff.NewExponentialBackOff(), ctx),
func(err error, duration time.Duration) {
if errdefs.IsNotFound(err) {
return
}
p.Logger.Printf("Waiting for container. Got an error: %v; Retrying in %d seconds", err, duration/time.Second)
},
)
}

func (p *DockerProvider) ReuseOrCreateContainer(ctx context.Context, req ContainerRequest) (Container, error) {
Expand Down Expand Up @@ -1283,19 +1294,24 @@ func (p *DockerProvider) attemptToPullImage(ctx context.Context, tag string, pul
}

var pull io.ReadCloser
err = backoff.Retry(func() error {
pull, err = p.client.ImagePull(ctx, tag, pullOpt)
if err != nil {
if isPermanentClientError(err) {
return backoff.Permanent(err)
err = backoff.RetryNotify(
func() error {
pull, err = p.client.ImagePull(ctx, tag, pullOpt)
if err != nil {
if isPermanentClientError(err) {
return backoff.Permanent(err)
}
return err
}
Logger.Printf("Failed to pull image: %s, will retry", err)
return err
}
defer p.Close()

return nil
}, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
defer p.Close()

return nil
},
backoff.WithContext(backoff.NewExponentialBackOff(), ctx),
func(err error, duration time.Duration) {
p.Logger.Printf("Failed to pull image: %s, will retry", err)
},
)
if err != nil {
return err
}
Expand Down
48 changes: 27 additions & 21 deletions lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,33 +215,39 @@ var defaultReadinessHook = func() ContainerLifecycleHooks {
b.MaxElapsedTime = 1 * time.Second
b.MaxInterval = 5 * time.Second

err := backoff.Retry(func() error {
jsonRaw, err := dockerContainer.inspectRawContainer(ctx)
if err != nil {
return err
}
err := backoff.RetryNotify(
func() error {
jsonRaw, err := dockerContainer.inspectRawContainer(ctx)
if err != nil {
return err
}

exposedAndMappedPorts := jsonRaw.NetworkSettings.Ports

for _, exposedPort := range dockerContainer.exposedPorts {
portMap := nat.Port(exposedPort)
// having entries in exposedAndMappedPorts, where the key is the exposed port,
// and the value is the mapped port, means that the port has been already mapped.
if _, ok := exposedAndMappedPorts[portMap]; !ok {
// check if the port is mapped with the protocol (default is TCP)
if !strings.Contains(exposedPort, "/") {
portMap = nat.Port(fmt.Sprintf("%s/tcp", exposedPort))
if _, ok := exposedAndMappedPorts[portMap]; !ok {
exposedAndMappedPorts := jsonRaw.NetworkSettings.Ports

for _, exposedPort := range dockerContainer.exposedPorts {
portMap := nat.Port(exposedPort)
// having entries in exposedAndMappedPorts, where the key is the exposed port,
// and the value is the mapped port, means that the port has been already mapped.
if _, ok := exposedAndMappedPorts[portMap]; !ok {
// check if the port is mapped with the protocol (default is TCP)
if !strings.Contains(exposedPort, "/") {
portMap = nat.Port(fmt.Sprintf("%s/tcp", exposedPort))
if _, ok := exposedAndMappedPorts[portMap]; !ok {
return fmt.Errorf("port %s is not mapped yet", exposedPort)
}
} else {
return fmt.Errorf("port %s is not mapped yet", exposedPort)
}
} else {
return fmt.Errorf("port %s is not mapped yet", exposedPort)
}
}
}

return nil
}, b)
return nil
},
b,
func(err error, duration time.Duration) {
dockerContainer.logger.Printf("All requested ports were not exposed: %v", err)
},
)
if err != nil {
return fmt.Errorf("all exposed ports, %s, were not mapped in 5s: %w", dockerContainer.exposedPorts, err)
}
Expand Down
109 changes: 52 additions & 57 deletions modules/clickhouse/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"path/filepath"
"testing"
"time"

ch "github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
Expand Down Expand Up @@ -95,7 +96,7 @@ func TestClickHouseConnectionHost(t *testing.T) {
defer conn.Close()

// perform assertions
data, err := performCRUD(conn)
data, err := performCRUD(t, conn)
require.NoError(t, err)
assert.Len(t, data, 1)
}
Expand Down Expand Up @@ -132,7 +133,7 @@ func TestClickHouseDSN(t *testing.T) {
defer conn.Close()

// perform assertions
data, err := performCRUD(conn)
data, err := performCRUD(t, conn)
require.NoError(t, err)
assert.Len(t, data, 1)
}
Expand Down Expand Up @@ -223,7 +224,7 @@ func TestClickHouseWithConfigFile(t *testing.T) {
defer conn.Close()

// perform assertions
data, err := performCRUD(conn)
data, err := performCRUD(t, conn)
require.NoError(t, err)
assert.Len(t, data, 1)
})
Expand Down Expand Up @@ -287,75 +288,69 @@ func TestClickHouseWithZookeeper(t *testing.T) {
defer conn.Close()

// perform assertions
data, err := performReplicatedCRUD(conn)
data, err := performReplicatedCRUD(t, conn)
require.NoError(t, err)
assert.Len(t, data, 1)
}

func performReplicatedCRUD(conn driver.Conn) ([]Test, error) {
var (
err error
res []Test
)

err = backoff.Retry(func() error {
err = conn.Exec(context.Background(), "CREATE TABLE replicated_test_table (id UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/mdb.data_transfer_cp_cdc', '{replica}') PRIMARY KEY (id) ORDER BY (id) SETTINGS index_granularity = 8192;")
if err != nil {
return err
}

err = conn.Exec(context.Background(), "INSERT INTO replicated_test_table (id) VALUES (1);")
if err != nil {
return err
}

rows, err := conn.Query(context.Background(), "SELECT * FROM replicated_test_table;")
if err != nil {
return err
}
func performReplicatedCRUD(t *testing.T, conn driver.Conn) ([]Test, error) {
return backoff.RetryNotifyWithData(
func() ([]Test, error) {
err := conn.Exec(context.Background(), "CREATE TABLE replicated_test_table (id UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/mdb.data_transfer_cp_cdc', '{replica}') PRIMARY KEY (id) ORDER BY (id) SETTINGS index_granularity = 8192;")
if err != nil {
return nil, err
}

for rows.Next() {
var r Test
err = conn.Exec(context.Background(), "INSERT INTO replicated_test_table (id) VALUES (1);")
if err != nil {
return nil, err
}

err := rows.Scan(&r.Id)
rows, err := conn.Query(context.Background(), "SELECT * FROM replicated_test_table;")
if err != nil {
return err
return nil, err
}

res = append(res, r)
}
return nil
}, backoff.NewExponentialBackOff())
var res []Test
for rows.Next() {
var r Test

return res, err
}
err := rows.Scan(&r.Id)
if err != nil {
return nil, err
}

func performCRUD(conn driver.Conn) ([]Test, error) {
var (
err error
rows []Test
res = append(res, r)
}
return res, nil
},
backoff.NewExponentialBackOff(),
func(err error, duration time.Duration) {
t.Log(err)
},
)
}

err = backoff.Retry(func() error {
err = conn.Exec(context.Background(), "create table if not exists test_table (id UInt64) engine = MergeTree PRIMARY KEY (id) ORDER BY (id) SETTINGS index_granularity = 8192;")
if err != nil {
return err
}

err = conn.Exec(context.Background(), "INSERT INTO test_table (id) VALUES (1);")
if err != nil {
return err
}

rows, err = getAllRows(conn)
if err != nil {
return err
}
func performCRUD(t *testing.T, conn driver.Conn) ([]Test, error) {
return backoff.RetryNotifyWithData(
func() ([]Test, error) {
err := conn.Exec(context.Background(), "create table if not exists test_table (id UInt64) engine = MergeTree PRIMARY KEY (id) ORDER BY (id) SETTINGS index_granularity = 8192;")
if err != nil {
return nil, err
}

return nil
}, backoff.NewExponentialBackOff())
err = conn.Exec(context.Background(), "INSERT INTO test_table (id) VALUES (1);")
if err != nil {
return nil, err
}

return rows, err
return getAllRows(conn)
},
backoff.NewExponentialBackOff(),
func(err error, duration time.Duration) {
t.Log(err)
},
)
}

func getAllRows(conn driver.Conn) ([]Test, error) {
Expand Down
Loading