Skip to content

Commit

Permalink
feat(query): add RaftReadBarrier
Browse files Browse the repository at this point in the history
  • Loading branch information
Michal-Leszczynski authored and karol-kokoszka committed Jun 21, 2024
1 parent c3728e2 commit 39358fa
Showing 1 changed file with 30 additions and 0 deletions.
30 changes: 30 additions & 0 deletions pkg/util/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
package query

import (
"fmt"
"strings"

"github.com/gocql/gocql"
"github.com/pkg/errors"
"github.com/scylladb/go-set/strset"
"github.com/scylladb/gocqlx/v2"
"github.com/scylladb/gocqlx/v2/qb"
"github.com/scylladb/scylla-manager/v3/pkg/util/uuid"
)

// GetAllViews returns set of all views present in cluster.
Expand Down Expand Up @@ -78,3 +81,30 @@ func DescribeSchemaWithInternals(session gocqlx.Session) (DescribedSchema, error
}
return schema, nil
}

// RaftReadBarrier performs read barrier on a random node to which session is connected to.
// When used with single host session, it can be used to ensure that given node
// has already applied all raft commands present in the raft log.
func RaftReadBarrier(session gocqlx.Session) error {
// As this functionality is not available from Scylla API,
// SM uses the workaround described in https://github.com/scylladb/scylladb/issues/19213.
// Read barrier is performed before any schema change, so it's enough to try to apply
// some dummy schema changing CQL statement.
// In order to avoid misleading errors, it's good to use the IF EXISTS clause.
// Still, it might happen that SM does not have the permissions to perform dummy schema change.
// As described in the issue, this type of error still results in performing read barrier,
// so SM should treat it as if everything went fine.
// TODO: swap workaround with proper API call when mentioned issue is fixed.
// nolint: godox
tmpName := strings.ReplaceAll(uuid.NewTime().String(), "-", "")
readBarrierStmt := fmt.Sprintf("DROP TABLE IF EXISTS %q.%q", tmpName, tmpName)
if err := session.ExecStmt(readBarrierStmt); err != nil && !isGocqlUnauthorizedError(err) {
return errors.Wrap(err, "exec dummy schema change")
}
return nil
}

func isGocqlUnauthorizedError(err error) bool {
var reqErr gocql.RequestError
return errors.As(err, &reqErr) && reqErr.Code() == gocql.ErrCodeUnauthorized
}

0 comments on commit 39358fa

Please sign in to comment.