Skip to content

Commit

Permalink
support gcs, part1
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Dec 29, 2022
1 parent d9c1cb5 commit 2f95fc9
Show file tree
Hide file tree
Showing 19 changed files with 263 additions and 251 deletions.
3 changes: 2 additions & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/puller"
"github.com/pingcap/tiflow/cdc/redo"
redoCommon "github.com/pingcap/tiflow/cdc/redo/common"
"github.com/pingcap/tiflow/cdc/scheduler"
"github.com/pingcap/tiflow/cdc/sink"
"github.com/pingcap/tiflow/pkg/config"
Expand Down Expand Up @@ -674,7 +675,7 @@ func (c *changefeed) cleanupRedoManager(ctx context.Context) {
log.Warn("changefeed is removed, but state is not complete", zap.Any("state", c.state))
return
}
if !redo.IsConsistentEnabled(c.state.Info.Config.Consistent.Level) {
if !redoCommon.IsConsistentEnabled(c.state.Info.Config.Consistent.Level) {
return
}
// when removing a paused changefeed, the redo manager is nil, create a new one
Expand Down
34 changes: 0 additions & 34 deletions cdc/redo/applier.go

This file was deleted.

2 changes: 1 addition & 1 deletion cdc/redo/convert.go → cdc/redo/common/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package redo
package common

import (
"bytes"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package redo
package common

import (
"testing"
Expand Down
71 changes: 71 additions & 0 deletions cdc/redo/common/model.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package common

// ConsistentLevelType is the level of redo log consistent level.
type ConsistentLevelType string

const (
// ConsistentLevelNone no consistent guarantee.
ConsistentLevelNone ConsistentLevelType = "none"
// ConsistentLevelEventual eventual consistent.
ConsistentLevelEventual ConsistentLevelType = "eventual"
)

type ConsistentStorage string

const (
ConsistentStorageBlackhole ConsistentStorage = "blackhole"
ConsistentStorageLocal ConsistentStorage = "local"
ConsistentStorageNFS ConsistentStorage = "nfs"
ConsistentStorageS3 ConsistentStorage = "s3"
ConsistentStorageGCS ConsistentStorage = "gcs"
)

// IsValidConsistentLevel checks whether a given consistent level is valid
func IsValidConsistentLevel(level string) bool {
switch ConsistentLevelType(level) {
case ConsistentLevelNone, ConsistentLevelEventual:
return true
default:
return false
}
}

// IsConsistentEnabled returns whether the consistent feature is enabled.
func IsConsistentEnabled(level string) bool {
return IsValidConsistentLevel(level) && ConsistentLevelType(level) != ConsistentLevelNone
}

// IsValidConsistentStorage checks whether a give consistent storage is valid.
func IsValidConsistentStorage(scheme string) bool {
switch ConsistentStorage(scheme) {
case ConsistentStorageBlackhole,
ConsistentStorageLocal, ConsistentStorageNFS,
ConsistentStorageS3, ConsistentStorageGCS:
return true
default:
return false
}
}

// IsExternalStorage returns whether external storage, such as s3 and gcs, is used.
func IsExternalStorage(storage string) bool {
switch ConsistentStorage(storage) {
case ConsistentStorageS3, ConsistentStorageGCS:
return true
default:
return false
}
}
153 changes: 25 additions & 128 deletions cdc/redo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@ package redo
import (
"context"
"math"
"path/filepath"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
Expand All @@ -38,64 +36,13 @@ import (
)

var (
// flushIntervalInMs is the minimum value of flush interval
flushIntervalInMs int64 = 2000 // 2 seconds
flushTimeout = time.Second * 20
flushIntervalInMs int64
flushTimeout = time.Second * 20

// Redo Manager GC interval. It can be changed in tests.
defaultGCIntervalInMs = 5000 // 5 seconds
)

// ConsistentLevelType is the level of redo log consistent level.
type ConsistentLevelType string

const (
// ConsistentLevelNone no consistent guarantee.
ConsistentLevelNone ConsistentLevelType = "none"
// ConsistentLevelEventual eventual consistent.
ConsistentLevelEventual ConsistentLevelType = "eventual"
)

type consistentStorage string

const (
consistentStorageLocal consistentStorage = "local"
consistentStorageNFS consistentStorage = "nfs"
consistentStorageS3 consistentStorage = "s3"
consistentStorageBlackhole consistentStorage = "blackhole"
)

// IsValidConsistentLevel checks whether a given consistent level is valid
func IsValidConsistentLevel(level string) bool {
switch ConsistentLevelType(level) {
case ConsistentLevelNone, ConsistentLevelEventual:
return true
default:
return false
}
}

// IsValidConsistentStorage checks whether a give consistent storage is valid
func IsValidConsistentStorage(storage string) bool {
switch consistentStorage(storage) {
case consistentStorageLocal, consistentStorageNFS,
consistentStorageS3, consistentStorageBlackhole:
return true
default:
return false
}
}

// IsConsistentEnabled returns whether the consistent feature is enabled
func IsConsistentEnabled(level string) bool {
return IsValidConsistentLevel(level) && ConsistentLevelType(level) != ConsistentLevelNone
}

// IsS3StorageEnabled returns whether s3 storage is enabled
func IsS3StorageEnabled(storage string) bool {
return consistentStorage(storage) == consistentStorageS3
}

// LogManager defines an interface that is used to manage redo log
type LogManager interface {
// Enabled returns whether the log manager is enabled
Expand Down Expand Up @@ -170,8 +117,6 @@ func (s *statefulRts) checkAndSetUnflushed(unflushed model.Ts) {
type ManagerImpl struct {
changeFeedID model.ChangeFeedID
enabled bool
level ConsistentLevelType
storageType consistentStorage

opts *ManagerOptions

Expand Down Expand Up @@ -207,89 +152,39 @@ type ManagerImpl struct {
// NewManager creates a new Manager
func NewManager(ctx context.Context, cfg *config.ConsistentConfig, opts *ManagerOptions) (*ManagerImpl, error) {
// return a disabled Manager if no consistent config or normal consistent level
if cfg == nil || ConsistentLevelType(cfg.Level) == ConsistentLevelNone {
if cfg == nil || !common.IsConsistentEnabled(cfg.Level) {
return &ManagerImpl{enabled: false}, nil
}
if cfg.FlushIntervalInMs > flushIntervalInMs {
flushIntervalInMs = cfg.FlushIntervalInMs
}
flushIntervalInMs = cfg.FlushIntervalInMs

uri, err := storage.ParseRawURL(cfg.Storage)
writer, err := writer.NewRedoLogWriter(ctx, cfg, opts.EmitMeta,
opts.EmitRowEvents, opts.EmitDDLEvents)
if err != nil {
return nil, err
}

changeFeedID := contextutil.ChangefeedIDFromCtx(ctx)
m := &ManagerImpl{
changeFeedID: changeFeedID,
changeFeedID: contextutil.ChangefeedIDFromCtx(ctx),
enabled: true,
level: ConsistentLevelType(cfg.Level),
storageType: consistentStorage(uri.Scheme),
opts: opts,
rtsMap: sync.Map{},
minResolvedTs: math.MaxInt64,

metricWriteLogDuration: common.RedoWriteLogDurationHistogram.
WithLabelValues(changeFeedID.Namespace, changeFeedID.ID),
metricFlushLogDuration: common.RedoFlushLogDurationHistogram.
WithLabelValues(changeFeedID.Namespace, changeFeedID.ID),
writer: writer,
}

switch m.storageType {
case consistentStorageBlackhole:
m.writer = writer.NewBlackHoleWriter()
case consistentStorageLocal, consistentStorageNFS, consistentStorageS3:
globalConf := config.GetGlobalServerConfig()
// When an external storage such S3 is used, we use redoDir as a temporary dir to store redo logs
// before we flush them to S3.
var redoDir string
if changeFeedID.Namespace == model.DefaultNamespace {
redoDir = filepath.Join(globalConf.DataDir,
config.DefaultRedoDir, changeFeedID.ID)
} else {
redoDir = filepath.Join(globalConf.DataDir,
config.DefaultRedoDir,
changeFeedID.Namespace, changeFeedID.ID)
}

// When local storage or NFS is used, we use redoDir as the final storage path.
if m.storageType == consistentStorageLocal || m.storageType == consistentStorageNFS {
redoDir = uri.Path
}

writerCfg := &writer.LogWriterConfig{
Dir: redoDir,
CaptureID: contextutil.CaptureAddrFromCtx(ctx),
ChangeFeedID: changeFeedID,
CreateTime: time.Now(),
MaxLogSize: cfg.MaxLogSize,
FlushIntervalInMs: cfg.FlushIntervalInMs,
S3Storage: m.storageType == consistentStorageS3,

EmitMeta: m.opts.EmitMeta,
EmitRowEvents: m.opts.EmitRowEvents,
EmitDDLEvents: m.opts.EmitDDLEvents,
}
if writerCfg.S3Storage {
writerCfg.S3URI = *uri
}
writer, err := writer.NewLogWriter(ctx, writerCfg)
if err != nil {
return nil, err
}
m.writer = writer

if m.opts.EmitMeta {
checkpointTs, resolvedTs := m.writer.GetMeta()
m.metaCheckpointTs.flushed = checkpointTs
m.metaCheckpointTs.unflushed = checkpointTs
m.metaResolvedTs.flushed = resolvedTs
m.metaResolvedTs.unflushed = resolvedTs
}
default:
return nil, cerror.ErrConsistentStorage.GenWithStackByArgs(m.storageType)
if m.opts.EmitMeta {
checkpointTs, resolvedTs := m.writer.GetMeta()
m.metaCheckpointTs.flushed = checkpointTs
m.metaCheckpointTs.unflushed = checkpointTs
m.metaResolvedTs.flushed = resolvedTs
m.metaResolvedTs.unflushed = resolvedTs
}

m.metricWriteLogDuration = common.RedoWriteLogDurationHistogram.
WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID)
m.metricFlushLogDuration = common.RedoFlushLogDurationHistogram.
WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID)

// TODO: better to wait background goroutines after the context is canceled.
if m.opts.EnableBgRunner {
m.logBuffer = chann.New[cacheEvents]()
Expand All @@ -310,7 +205,7 @@ func NewDisabledManager() *ManagerImpl {
// NewMockManager returns a mock redo manager instance, used in test only
func NewMockManager(ctx context.Context) (*ManagerImpl, error) {
cfg := &config.ConsistentConfig{
Level: string(ConsistentLevelEventual),
Level: string(common.ConsistentLevelEventual),
Storage: "blackhole://",
}

Expand Down Expand Up @@ -396,7 +291,9 @@ func (m *ManagerImpl) UpdateCheckpointTs(ckpt model.Ts) {

// EmitDDLEvent sends DDL event to redo log writer
func (m *ManagerImpl) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
return m.withLock(func(m *ManagerImpl) error { return m.writer.SendDDL(ctx, DDLToRedo(ddl)) })
return m.withLock(func(m *ManagerImpl) error {
return m.writer.SendDDL(ctx, common.DDLToRedo(ddl))
})
}

// GetResolvedTs returns the resolved ts of a table
Expand Down Expand Up @@ -684,7 +581,7 @@ func (m *ManagerImpl) bgUpdateLog(ctx context.Context, errCh chan<- error) {
switch cache.eventType {
case model.MessageTypeRow:
for _, row := range cache.rows {
logs = append(logs, RowToRedo(row))
logs = append(logs, common.RowToRedo(row))
}
if cache.releaseMemory != nil {
releaseMemoryCbs = append(releaseMemoryCbs, cache.releaseMemory)
Expand Down Expand Up @@ -714,7 +611,7 @@ func (m *ManagerImpl) bgUpdateLog(ctx context.Context, errCh chan<- error) {
switch cache.eventType {
case model.MessageTypeRow:
for _, row := range cache.rows {
logs = append(logs, RowToRedo(row))
logs = append(logs, common.RowToRedo(row))
}
if cache.releaseMemory != nil {
releaseMemoryCbs = append(releaseMemoryCbs, cache.releaseMemory)
Expand Down
Loading

0 comments on commit 2f95fc9

Please sign in to comment.