diff --git a/cdc/server/server.go b/cdc/server/server.go index 9c280e8acff..d26fb0db923 100644 --- a/cdc/server/server.go +++ b/cdc/server/server.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/util/gctuner" "github.com/pingcap/tiflow/cdc" "github.com/pingcap/tiflow/cdc/capture" + "github.com/pingcap/tiflow/cdc/contextutil" "github.com/pingcap/tiflow/cdc/kv" "github.com/pingcap/tiflow/cdc/processor/pipeline/system" "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/factory" @@ -282,23 +283,23 @@ func (s *server) startActorSystems(ctx context.Context) error { } // Run runs the server. -func (s *server) Run(ctx context.Context) error { - if err := s.prepare(ctx); err != nil { +func (s *server) Run(serverCtx context.Context) error { + if err := s.prepare(serverCtx); err != nil { return err } - err := s.startStatusHTTP(s.tcpServer.HTTP1Listener()) + err := s.startStatusHTTP(serverCtx, s.tcpServer.HTTP1Listener()) if err != nil { return err } - return s.run(ctx) + return s.run(serverCtx) } // startStatusHTTP starts the HTTP server. // `lis` is a listener that gives us plain-text HTTP requests. // TODO: can we decouple the HTTP server from the capture server? -func (s *server) startStatusHTTP(lis net.Listener) error { +func (s *server) startStatusHTTP(serverCtx context.Context, lis net.Listener) error { // LimitListener returns a Listener that accepts at most n simultaneous // connections from the provided Listener. Connections that exceed the // limit will wait in a queue and no new goroutines will be created until @@ -322,6 +323,10 @@ func (s *server) startStatusHTTP(lis net.Listener) error { Handler: router, ReadTimeout: httpConnectionTimeout, WriteTimeout: httpConnectionTimeout, + BaseContext: func(listener net.Listener) context.Context { + return contextutil.PutTimezoneInCtx(context.Background(), + contextutil.TimezoneFromCtx(serverCtx)) + }, } go func() { diff --git a/cdc/server/server_test.go b/cdc/server/server_test.go index 6cc83a53199..e4366aa84eb 100644 --- a/cdc/server/server_test.go +++ b/cdc/server/server_test.go @@ -190,7 +190,7 @@ func TestServerTLSWithoutCommonName(t *testing.T) { cp.EtcdClient = etcdClient server.capture = cp require.Nil(t, err) - err = server.startStatusHTTP(server.tcpServer.HTTP1Listener()) + err = server.startStatusHTTP(context.TODO(), server.tcpServer.HTTP1Listener()) require.Nil(t, err) defer func() { require.Nil(t, server.statusServer.Close()) @@ -277,7 +277,7 @@ func TestServerTLSWithCommonNameAndRotate(t *testing.T) { cp.EtcdClient = etcdClient server.capture = cp require.Nil(t, err) - err = server.startStatusHTTP(server.tcpServer.HTTP1Listener()) + err = server.startStatusHTTP(context.TODO(), server.tcpServer.HTTP1Listener()) require.Nil(t, err) defer func() { require.Nil(t, server.statusServer.Close()) diff --git a/pkg/sink/mysql/config.go b/pkg/sink/mysql/config.go index 24dc0506f94..de744872894 100644 --- a/pkg/sink/mysql/config.go +++ b/pkg/sink/mysql/config.go @@ -30,6 +30,7 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/security" "github.com/pingcap/tiflow/pkg/sink" + "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" ) @@ -357,28 +358,48 @@ func getSafeMode(values url.Values, safeMode *bool) error { return nil } -func getTimezone(ctx context.Context, values url.Values, timezone *string) error { +func getTimezone(ctxWithTimezone context.Context, values url.Values, timezone *string) error { + const pleaseSpecifyTimezone = "We recommend that you specify the time-zone explicitly. " + + "Please make sure that the timezone of the TiCDC server, " + + "sink-uri and the downstream database are consistent. " + + "If the downstream database does not load the timezone information, " + + "you can refer to https://dev.mysql.com/doc/refman/8.0/en/mysql-tzinfo-to-sql.html." + serverTimezone := contextutil.TimezoneFromCtx(ctxWithTimezone) if _, ok := values["time-zone"]; !ok { - tz := contextutil.TimezoneFromCtx(ctx) - *timezone = fmt.Sprintf(`"%s"`, tz.String()) + // If time-zone is not specified, use the timezone of the server. + log.Warn("Because time-zone is not specified, "+ + "the timezone of the TiCDC server will be used. "+ + pleaseSpecifyTimezone, + zap.String("timezone", serverTimezone.String())) + *timezone = fmt.Sprintf(`"%s"`, serverTimezone.String()) return nil } s := values.Get("time-zone") if len(s) == 0 { *timezone = "" + log.Warn("Because time-zone is empty, " + + "the timezone of the downstream database will be used. " + + pleaseSpecifyTimezone) return nil } - value, err := url.QueryUnescape(s) + changefeedTimezone, err := util.GetTimezone(s) if err != nil { return cerror.WrapError(cerror.ErrMySQLInvalidConfig, err) } - _, err = time.LoadLocation(value) - if err != nil { - return cerror.WrapError(cerror.ErrMySQLInvalidConfig, err) + *timezone = fmt.Sprintf(`"%s"`, changefeedTimezone.String()) + // We need to check whether the timezone of the TiCDC server and the sink-uri are consistent. + // If they are inconsistent, it may cause the data to be inconsistent. + if changefeedTimezone.String() != serverTimezone.String() { + return cerror.WrapError(cerror.ErrMySQLInvalidConfig, errors.Errorf( + "the timezone of the TiCDC server and the sink-uri are inconsistent. "+ + "TiCDC server timezone: %s, sink-uri timezone: %s. "+ + "Please make sure that the timezone of the TiCDC server, "+ + "sink-uri and the downstream database are consistent.", + serverTimezone.String(), changefeedTimezone.String())) } - *timezone = fmt.Sprintf(`"%s"`, s) + return nil } diff --git a/pkg/sink/mysql/config_test.go b/pkg/sink/mysql/config_test.go index 9ba903f841a..afbfd127e7e 100644 --- a/pkg/sink/mysql/config_test.go +++ b/pkg/sink/mysql/config_test.go @@ -19,11 +19,14 @@ import ( "net/url" "strings" "testing" + "time" "github.com/DATA-DOG/go-sqlmock" dmysql "github.com/go-sql-driver/mysql" + "github.com/pingcap/tiflow/cdc/contextutil" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" ) @@ -201,32 +204,6 @@ func TestApplySinkURIParamsToConfig(t *testing.T) { require.Equal(t, expected, cfg) } -func TestParseSinkURITimezone(t *testing.T) { - t.Parallel() - - uris := []string{ - "mysql://127.0.0.1:3306/?time-zone=Asia/Shanghai&worker-count=32", - "mysql://127.0.0.1:3306/?time-zone=&worker-count=32", - "mysql://127.0.0.1:3306/?worker-count=32", - } - expected := []string{ - "\"Asia/Shanghai\"", - "", - "\"UTC\"", - } - ctx := context.TODO() - for i, uriStr := range uris { - uri, err := url.Parse(uriStr) - require.Nil(t, err) - cfg := NewConfig() - err = cfg.Apply(ctx, - model.DefaultChangeFeedID("cf"), - uri, config.GetDefaultReplicaConfig()) - require.Nil(t, err) - require.Equal(t, expected[i], cfg.Timezone) - } -} - func TestParseSinkURIOverride(t *testing.T) { t.Parallel() @@ -341,3 +318,91 @@ func TestCheckTiDBVariable(t *testing.T) { require.NotNil(t, err) require.Regexp(t, ".*"+sql.ErrConnDone.Error(), err.Error()) } + +func TestApplyTimezone(t *testing.T) { + t.Parallel() + + localTimezone, err := util.GetTimezone("Local") + require.Nil(t, err) + + for _, test := range []struct { + name string + noChangefeedTimezone bool + changefeedTimezone string + serverTimezone *time.Location + expected string + expectedHasErr bool + expectedErr string + }{ + { + name: "no changefeed timezone", + noChangefeedTimezone: true, + serverTimezone: time.UTC, + expected: "\"UTC\"", + expectedHasErr: false, + }, + { + name: "empty changefeed timezone", + noChangefeedTimezone: false, + changefeedTimezone: "", + serverTimezone: time.UTC, + expected: "", + expectedHasErr: false, + }, + { + name: "normal changefeed timezone", + noChangefeedTimezone: false, + changefeedTimezone: "UTC", + serverTimezone: time.UTC, + expected: "\"UTC\"", + expectedHasErr: false, + }, + { + name: "local timezone", + noChangefeedTimezone: false, + changefeedTimezone: "Local", + serverTimezone: localTimezone, + expected: "\"" + localTimezone.String() + "\"", + expectedHasErr: false, + }, + { + name: "sink-uri timezone different from server timezone", + noChangefeedTimezone: false, + changefeedTimezone: "UTC", + serverTimezone: localTimezone, + expectedHasErr: true, + expectedErr: "Please make sure that the timezone of the TiCDC server", + }, + { + name: "unsupported timezone format", + noChangefeedTimezone: false, + changefeedTimezone: "%2B08%3A00", // +08:00 + serverTimezone: time.UTC, + expectedHasErr: true, + expectedErr: "unknown time zone +08:00", + }, + } { + tc := test + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + cfg := NewConfig() + ctx := contextutil.PutTimezoneInCtx(context.Background(), tc.serverTimezone) + sinkURI := "mysql://127.0.0.1:3306" + if !tc.noChangefeedTimezone { + sinkURI = sinkURI + "?time-zone=" + tc.changefeedTimezone + } + uri, err := url.Parse(sinkURI) + require.Nil(t, err) + err = cfg.Apply(ctx, + model.DefaultChangeFeedID("changefeed-01"), uri, config.GetDefaultReplicaConfig()) + if tc.expectedHasErr { + require.NotNil(t, err) + require.Contains(t, err.Error(), tc.expectedErr) + } else { + require.Nil(t, err) + require.Equal(t, tc.expected, cfg.Timezone) + } + }) + } +} diff --git a/pkg/util/tz.go b/pkg/util/tz.go index fbdc2a1bd0e..daaf53bbd91 100644 --- a/pkg/util/tz.go +++ b/pkg/util/tz.go @@ -18,8 +18,10 @@ import ( "strings" "time" + "github.com/pingcap/log" "github.com/pingcap/tidb/util/timeutil" cerror "github.com/pingcap/tiflow/pkg/errors" + "go.uber.org/zap" ) // GetTimezone returns the timezone specified by the name @@ -28,9 +30,19 @@ func GetTimezone(name string) (tz *time.Location, err error) { case "", "system", "local": tz, err = GetLocalTimezone() err = cerror.WrapError(cerror.ErrLoadTimezone, err) + if err == nil { + log.Info("Use the timezone of the TiCDC server machine", + zap.String("timezoneName", name), + zap.String("timezone", tz.String())) + } default: tz, err = time.LoadLocation(name) err = cerror.WrapError(cerror.ErrLoadTimezone, err) + if err == nil { + log.Info("Load the timezone specified by the user", + zap.String("timezoneName", name), + zap.String("timezone", tz.String())) + } } return }