Skip to content

Commit

Permalink
Bug fix for loading persisted server_id from config.json as string
Browse files Browse the repository at this point in the history
extra connection error logging
  • Loading branch information
fulghum committed Jul 26, 2024
1 parent 0a07d07 commit 2d07963
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,12 @@ func (a *binlogReplicaApplier) connectAndStartReplicationEventStream(ctx *sql.Co

conn, err = mysql.Connect(ctx, &connParams)
if err != nil {
logrus.Warnf("failed connection attempt to source (%s): %s",
replicaSourceInfo.Host, err.Error())

if connectionAttempts >= maxConnectionAttempts {
ctx.GetLogger().Errorf("Exceeded max connection attempts (%d) to source server", maxConnectionAttempts)
ctx.GetLogger().Errorf("Exceeded max connection attempts (%d) to source (%s)",
maxConnectionAttempts, replicaSourceInfo.Host)
return nil, err
}
// If there was an error connecting (and we haven't used up all our retry attempts), listen for a
Expand Down Expand Up @@ -916,12 +920,23 @@ func getAllUserDatabaseNames(ctx *sql.Context, engine *gms.Engine) []string {
// loadReplicaServerId loads the @@GLOBAL.server_id system variable needed to register the replica with the source,
// and returns an error specific to replication configuration if the variable is not set to a valid value.
func loadReplicaServerId() (uint32, error) {
_, value, ok := sql.SystemVariables.GetGlobal("server_id")
serverIdVar, value, ok := sql.SystemVariables.GetGlobal("server_id")
if !ok {
return 0, fmt.Errorf("no server_id global system variable set")
}

// Persisted values stored in .dolt/config.json can cause string values to be stored in
// system variables, so attempt to convert the value if we can't directly cast it to a uint32.
serverId, ok := value.(uint32)
if !ok {
var err error
value, _, err = serverIdVar.GetType().Convert(value)
if err != nil {
return 0, err
}
}

serverId, ok = value.(uint32)
if !ok || serverId == 0 {
return 0, fmt.Errorf("invalid server ID configured for @@GLOBAL.server_id (%v); "+
"must be an integer greater than zero and less than 4,294,967,296", serverId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
// data types can be successfully replicated.
func TestBinlogReplicationForAllTypes(t *testing.T) {
defer teardown(t)
startSqlServers(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
startReplication(t, mySqlPort)

// Set the session's timezone to UTC, to avoid TIMESTAMP test values changing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
// filtering option is correctly applied and honored.
func TestBinlogReplicationFilters_ignoreTablesOnly(t *testing.T) {
defer teardown(t)
startSqlServers(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
startReplication(t, mySqlPort)

// Ignore replication events for db01.t2. Also tests that the first filter setting is overwritten by
Expand Down Expand Up @@ -76,7 +76,7 @@ func TestBinlogReplicationFilters_ignoreTablesOnly(t *testing.T) {
// filtering option is correctly applied and honored.
func TestBinlogReplicationFilters_doTablesOnly(t *testing.T) {
defer teardown(t)
startSqlServers(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
startReplication(t, mySqlPort)

// Do replication events for db01.t1. Also tests that the first filter setting is overwritten by
Expand Down Expand Up @@ -127,7 +127,7 @@ func TestBinlogReplicationFilters_doTablesOnly(t *testing.T) {
// replication filtering options are correctly applied and honored when used together.
func TestBinlogReplicationFilters_doTablesAndIgnoreTables(t *testing.T) {
defer teardown(t)
startSqlServers(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
startReplication(t, mySqlPort)

// Do replication events for db01.t1, and db01.t2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
// applied by a replica.
func TestBinlogReplicationMultiDb(t *testing.T) {
defer teardown(t)
startSqlServers(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
startReplication(t, mySqlPort)

// Make changes on the primary to db01 and db02
Expand Down Expand Up @@ -126,7 +126,7 @@ func TestBinlogReplicationMultiDb(t *testing.T) {
// multiple DBs are applied correctly to a replica.
func TestBinlogReplicationMultiDbTransactions(t *testing.T) {
defer teardown(t)
startSqlServers(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
startReplication(t, mySqlPort)

// Make changes on the primary to db01 and db02
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var proxyPort int
// reestablished if it drops.
func TestBinlogReplicationAutoReconnect(t *testing.T) {
defer teardown(t)
startSqlServers(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
configureToxiProxy(t)
configureFastConnectionRetry(t)
startReplication(t, proxyPort)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
// server process can be restarted and replica can be restarted without problems.
func TestBinlogReplicationServerRestart(t *testing.T) {
defer teardown(t)
startSqlServers(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
startReplication(t, mySqlPort)

primaryDatabase.MustExec("create table t (pk int auto_increment primary key)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ var doltLogFile, mysqlLogFile *os.File
var testDir string
var originalWorkingDir string

// doltReplicaSystemVars are the common system variables that need
// to be set on a Dolt replica before replication is turned on.
var doltReplicaSystemVars = map[string]string{
"server_id": "42",
}

func teardown(t *testing.T) {
if mySqlProcess != nil {
stopMySqlServer(t)
Expand Down Expand Up @@ -92,7 +98,7 @@ func teardown(t *testing.T) {
// Dolt replica.
func TestBinlogReplicationSanityCheck(t *testing.T) {
defer teardown(t)
startSqlServers(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
startReplication(t, mySqlPort)

// Make changes on the primary and verify on the replica
Expand Down Expand Up @@ -129,7 +135,7 @@ func TestBinlogSystemUserIsLocked(t *testing.T) {
// process the events without errors.
func TestFlushLogs(t *testing.T) {
defer teardown(t)
startSqlServers(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
startReplication(t, mySqlPort)

// Make changes on the primary and verify on the replica
Expand All @@ -153,7 +159,7 @@ func TestFlushLogs(t *testing.T) {
// replication configuration and metadata.
func TestResetReplica(t *testing.T) {
defer teardown(t)
startSqlServers(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
startReplication(t, mySqlPort)

// RESET REPLICA returns an error if replication is running
Expand Down Expand Up @@ -205,7 +211,7 @@ func TestResetReplica(t *testing.T) {
// for various error conditions.
func TestStartReplicaErrors(t *testing.T) {
defer teardown(t)
startSqlServers(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)

// START REPLICA returns an error when no replication source is configured
_, err := replicaDatabase.Queryx("START REPLICA;")
Expand Down Expand Up @@ -248,14 +254,13 @@ func TestShowReplicaStatus(t *testing.T) {
// warnings are logged when STOP REPLICA is invoked when replication is not running.
func TestStopReplica(t *testing.T) {
defer teardown(t)
startSqlServers(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)

// STOP REPLICA logs a warning if replication is not running
replicaDatabase.MustExec("STOP REPLICA;")
assertWarning(t, replicaDatabase, 3084, "Replication thread(s) for channel '' are already stopped.")

// Start replication with bad connection params
replicaDatabase.MustExec("SET @@GLOBAL.server_id=52;")
replicaDatabase.MustExec("CHANGE REPLICATION SOURCE TO SOURCE_HOST='doesnotexist', SOURCE_PORT=111, SOURCE_USER='nobody';")
replicaDatabase.MustExec("START REPLICA;")
time.Sleep(200 * time.Millisecond)
Expand Down Expand Up @@ -290,7 +295,7 @@ func TestStopReplica(t *testing.T) {
// TestDoltCommits tests that Dolt commits are created and use correct transaction boundaries.
func TestDoltCommits(t *testing.T) {
defer teardown(t)
startSqlServers(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
startReplication(t, mySqlPort)

// First transaction (DDL)
Expand Down Expand Up @@ -370,7 +375,7 @@ func TestDoltCommits(t *testing.T) {
// enabled and disabled.
func TestForeignKeyChecks(t *testing.T) {
defer teardown(t)
startSqlServers(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
startReplication(t, mySqlPort)

// Test that we can execute statement-based replication that requires foreign_key_checks
Expand Down Expand Up @@ -427,7 +432,7 @@ func TestForeignKeyChecks(t *testing.T) {
// TestCharsetsAndCollations tests that we can successfully replicate data using various charsets and collations.
func TestCharsetsAndCollations(t *testing.T) {
defer teardown(t)
startSqlServers(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
startReplication(t, mySqlPort)

// Use non-default charset/collations to create data on the primary
Expand Down Expand Up @@ -691,7 +696,6 @@ func startReplication(t *testing.T, port int) {
// pauses for |delay| before creating the test database, db01, on the primary, and ensures it
// gets replicated to the replica.
func startReplicationWithDelay(t *testing.T, port int, delay time.Duration) {
replicaDatabase.MustExec("SET @@GLOBAL.server_id=123;")
replicaDatabase.MustExec(
fmt.Sprintf("change replication source to SOURCE_HOST='localhost', "+
"SOURCE_USER='replicator', SOURCE_PASSWORD='Zqr8_blrGm1!', "+
Expand Down

0 comments on commit 2d07963

Please sign in to comment.