Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

*: avoid accessing internal ports when backend=tidb #312

Merged
merged 2 commits into from
May 11, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
uuid "github.com/satori/go.uuid"
Expand Down Expand Up @@ -115,6 +116,24 @@ type AbstractBackend interface {
ImportEngine(ctx context.Context, engineUUID uuid.UUID) error

CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error

// CheckRequirements performs the check whether the backend satisfies the
// version requirements
CheckRequirements() error

// FetchRemoteTableModels obtains the models of all tables given the schema
// name. The returned table info does not need to be precise if the encoder,
// is not requiring them, but must at least fill in the following fields for
// TablesFromMeta to succeed:
// - Name
// - State (must be model.StatePublic)
// - ID
// - Columns
// * Name
// * State (must be model.StatePublic)
// * Offset (must be 0, 1, 2, ...)
// - PKIsHandle (true = do not generate _tidb_rowid)
FetchRemoteTableModels(schemaName string) ([]*model.TableInfo, error)
}

// Backend is the delivery target for Lightning
Expand Down Expand Up @@ -170,6 +189,14 @@ func (be Backend) ShouldPostProcess() bool {
return be.abstract.ShouldPostProcess()
}

func (be Backend) CheckRequirements() error {
return be.abstract.CheckRequirements()
}

func (be Backend) FetchRemoteTableModels(schemaName string) ([]*model.TableInfo, error) {
return be.abstract.FetchRemoteTableModels(schemaName)
}

// OpenEngine opens an engine with the given table name and engine ID.
func (be Backend) OpenEngine(ctx context.Context, tableName string, engineID int32) (*OpenedEngine, error) {
tag := makeTag(tableName, engineID)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package restore
package backend

import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"strconv"

"github.com/coreos/go-semver/semver"
. "github.com/pingcap/check"
"github.com/pingcap/tidb-lightning/lightning/common"
"github.com/pingcap/tidb-lightning/lightning/config"
)

var _ = Suite(&checkReqSuite{})
Expand Down Expand Up @@ -93,26 +91,14 @@ func (s *checkReqSuite) TestCheckTiDBVersion(c *C) {
})
c.Assert(err, IsNil)
}))
mockURL, err := url.Parse(mockServer.URL)
c.Assert(err, IsNil)
mockPort, err := strconv.Atoi(mockURL.Port())
c.Assert(err, IsNil)

rc := &RestoreController{
cfg: &config.Config{
TiDB: config.DBStore{
Host: mockURL.Hostname(),
StatusPort: mockPort,
},
},
tls: common.NewTLSFromMockServer(mockServer),
}
tls := common.NewTLSFromMockServer(mockServer)

version = "5.7.25-TiDB-v9999.0.0"
c.Assert(rc.checkTiDBVersion(), IsNil)
c.Assert(checkTiDBVersion(tls, requiredTiDBVersion), IsNil)

version = "5.7.25-TiDB-v1.0.0"
c.Assert(rc.checkTiDBVersion(), ErrorMatches, "TiDB version too old.*")
c.Assert(checkTiDBVersion(tls, requiredTiDBVersion), ErrorMatches, "TiDB version too old.*")
}

func (s *checkReqSuite) TestCheckPDVersion(c *C) {
Expand All @@ -127,20 +113,13 @@ func (s *checkReqSuite) TestCheckPDVersion(c *C) {
mockURL, err := url.Parse(mockServer.URL)
c.Assert(err, IsNil)

rc := &RestoreController{
cfg: &config.Config{
TiDB: config.DBStore{
PdAddr: mockURL.Host,
},
},
tls: common.NewTLSFromMockServer(mockServer),
}
tls := common.NewTLSFromMockServer(mockServer)

version = "9999.0.0"
c.Assert(rc.checkPDVersion(), IsNil)
c.Assert(checkPDVersion(tls, mockURL.Host, requiredPDVersion), IsNil)

version = "1.0.0"
c.Assert(rc.checkPDVersion(), ErrorMatches, "PD version too old.*")
c.Assert(checkPDVersion(tls, mockURL.Host, requiredPDVersion), ErrorMatches, "PD version too old.*")
}

func (s *checkReqSuite) TestCheckTiKVVersion(c *C) {
Expand Down Expand Up @@ -168,21 +147,14 @@ func (s *checkReqSuite) TestCheckTiKVVersion(c *C) {
mockURL, err := url.Parse(mockServer.URL)
c.Assert(err, IsNil)

rc := &RestoreController{
cfg: &config.Config{
TiDB: config.DBStore{
PdAddr: mockURL.Host,
},
},
tls: common.NewTLSFromMockServer(mockServer),
}
tls := common.NewTLSFromMockServer(mockServer)

versions = []string{"9999.0.0", "9999.0.0"}
c.Assert(rc.checkTiKVVersion(), IsNil)
c.Assert(checkTiKVVersion(tls, mockURL.Host, requiredTiKVVersion), IsNil)

versions = []string{"4.1.0", "v4.1.0-alpha-9-ga27a7dd"}
c.Assert(rc.checkTiKVVersion(), IsNil)
c.Assert(checkTiKVVersion(tls, mockURL.Host, requiredTiKVVersion), IsNil)

versions = []string{"9999.0.0", "1.0.0"}
c.Assert(rc.checkTiKVVersion(), ErrorMatches, `TiKV \(at tikv1\.test:20160\) version too old.*`)
c.Assert(checkTiKVVersion(tls, mockURL.Host, requiredTiKVVersion), ErrorMatches, `TiKV \(at tikv1\.test:20160\) version too old.*`)
}
115 changes: 115 additions & 0 deletions lightning/backend/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@ package backend

import (
"context"
"fmt"
"strings"
"sync"
"time"

"github.com/coreos/go-semver/semver"
"github.com/pingcap/errors"
kv "github.com/pingcap/kvproto/pkg/import_kvpb"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/table"
uuid "github.com/satori/go.uuid"
"go.uber.org/zap"
Expand All @@ -34,12 +37,20 @@ const (
defaultRetryBackoffTime = time.Second * 3
)

var (
requiredTiDBVersion = *semver.New("2.1.0")
requiredPDVersion = *semver.New("2.1.0")
requiredTiKVVersion = *semver.New("2.1.0")
// ^ TODO: remember to use 4.0.0 for "local" backend.
)

// importer represents a gRPC connection to tikv-importer. This type is
// goroutine safe: you can share this instance and execute any method anywhere.
type importer struct {
conn *grpc.ClientConn
cli kv.ImportKVClient
pdAddr string
tls *common.TLS

mutationPool sync.Pool
}
Expand All @@ -56,6 +67,7 @@ func NewImporter(ctx context.Context, tls *common.TLS, importServerAddr string,
conn: conn,
cli: kv.NewImportKVClient(conn),
pdAddr: pdAddr,
tls: tls,
mutationPool: sync.Pool{New: func() interface{} { return &kv.Mutation{} }},
}), nil
}
Expand Down Expand Up @@ -228,3 +240,106 @@ func (*importer) MakeEmptyRows() Rows {
func (*importer) NewEncoder(tbl table.Table, options *SessionOptions) Encoder {
return NewTableKVEncoder(tbl, options)
}

func (importer *importer) CheckRequirements() error {
if err := checkTiDBVersion(importer.tls, requiredTiDBVersion); err != nil {
return err
}
if err := checkPDVersion(importer.tls, importer.pdAddr, requiredPDVersion); err != nil {
return err
}
if err := checkTiKVVersion(importer.tls, importer.pdAddr, requiredTiKVVersion); err != nil {
return err
}
return nil
}

func extractTiDBVersion(version string) (*semver.Version, error) {
// version format: "5.7.10-TiDB-v2.1.0-rc.1-7-g38c939f"
// ^~~~~~~~~^ we only want this part
// version format: "5.7.10-TiDB-v2.0.4-1-g06a0bf5"
// ^~~~^
// version format: "5.7.10-TiDB-v2.0.7"
// ^~~~^
// version format: "5.7.25-TiDB-v3.0.0-beta-211-g09beefbe0-dirty"
// ^~~~~~~~~^
// The version is generated by `git describe --tags` on the TiDB repository.
versions := strings.Split(strings.TrimSuffix(version, "-dirty"), "-")
end := len(versions)
switch end {
case 3, 4:
case 5, 6:
end -= 2
default:
return nil, errors.Errorf("not a valid TiDB version: %s", version)
}
rawVersion := strings.Join(versions[2:end], "-")
rawVersion = strings.TrimPrefix(rawVersion, "v")
return semver.NewVersion(rawVersion)
}

func checkTiDBVersion(tls *common.TLS, requiredVersion semver.Version) error {
var status struct{ Version string }
err := tls.GetJSON("/status", &status)
if err != nil {
return err
}

version, err := extractTiDBVersion(status.Version)
if err != nil {
return errors.Trace(err)
}
return checkVersion("TiDB", requiredVersion, *version)
}

func checkPDVersion(tls *common.TLS, pdAddr string, requiredVersion semver.Version) error {
var rawVersion string
err := tls.WithHost(pdAddr).GetJSON("/pd/api/v1/config/cluster-version", &rawVersion)
if err != nil {
return err
}

version, err := semver.NewVersion(rawVersion)
if err != nil {
return errors.Trace(err)
}

return checkVersion("PD", requiredVersion, *version)
}

func checkTiKVVersion(tls *common.TLS, pdAddr string, requiredVersion semver.Version) error {
return ForAllStores(
context.Background(),
tls.WithHost(pdAddr),
StoreStateDown,
func(c context.Context, store *Store) error {
component := fmt.Sprintf("TiKV (at %s)", store.Address)
version, err := semver.NewVersion(strings.TrimPrefix(store.Version, "v"))
if err != nil {
return errors.Annotate(err, component)
}
return checkVersion(component, requiredVersion, *version)
},
)
}

func checkVersion(component string, expected, actual semver.Version) error {
if actual.Compare(expected) >= 0 {
return nil
}
return errors.Errorf(
"%s version too old, expected '>=%s', found '%s'",
component,
expected,
actual,
)
}

func (importer *importer) FetchRemoteTableModels(schema string) ([]*model.TableInfo, error) {
var tables []*model.TableInfo
err := importer.tls.GetJSON("/schema/"+schema, &tables)
if err != nil {
return nil, errors.Annotatef(err, "cannot read schema '%s' from remote", schema)
}
return tables, nil
}
49 changes: 49 additions & 0 deletions lightning/backend/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -248,6 +249,11 @@ func (be *tidbBackend) ShouldPostProcess() bool {
return false
}

func (be *tidbBackend) CheckRequirements() error {
log.L().Info("skipping check requirements for tidb backend")
return nil
}

func (be *tidbBackend) NewEncoder(_ table.Table, options *SessionOptions) Encoder {
return tidbEncoder{mode: options.SQLMode}
}
Expand Down Expand Up @@ -314,3 +320,46 @@ func (be *tidbBackend) WriteRows(ctx context.Context, _ uuid.UUID, tableName str
})
return err
}

func (be *tidbBackend) FetchRemoteTableModels(schemaName string) (tables []*model.TableInfo, err error) {
s := common.SQLWithRetry{
DB: be.db,
Logger: log.L(),
}
err = s.Transact(context.Background(), "fetch table columns", func(c context.Context, tx *sql.Tx) error {
rows, e := tx.Query(`
SELECT table_name, group_concat(column_name SEPARATOR '\n')
FROM information_schema.columns
WHERE table_schema = ?
GROUP BY table_name;
`, schemaName)
if e != nil {
return e
}
defer rows.Close()

for rows.Next() {
var tableName, columnNamesConcat string
if e := rows.Scan(&tableName, &columnNamesConcat); e != nil {
return e
}
columnNames := strings.Split(columnNamesConcat, "\n")
columns := make([]*model.ColumnInfo, 0, len(columnNames))
for i, columnName := range columnNames {
columns = append(columns, &model.ColumnInfo{
Name: model.NewCIStr(columnName),
Offset: i,
State: model.StatePublic,
})
}
tables = append(tables, &model.TableInfo{
Name: model.NewCIStr(tableName),
Columns: columns,
State: model.StatePublic,
PKIsHandle: true,
})
}
return rows.Err()
})
return
}
5 changes: 3 additions & 2 deletions lightning/checkpoints/file_checkpoints.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 3 additions & 5 deletions lightning/checkpoints/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ type TidbDBInfo struct {
}

type TidbTableInfo struct {
ID int64
Name string
Columns int
Indices int
Core *model.TableInfo
ID int64
Name string
Core *model.TableInfo
}
Loading