Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

loader: support percent escape in dump files (#980) #991

Merged
merged 3 commits into from
Sep 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ package loader

import (
"bufio"
"bytes"
"context"
"encoding/hex"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -153,7 +155,7 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, runFatalCh
return
}
sqls := make([]string, 0, 3)
sqls = append(sqls, fmt.Sprintf("USE `%s`;", job.schema))
sqls = append(sqls, fmt.Sprintf("USE `%s`;", unescapePercent(job.schema, w.tctx.L())))
sqls = append(sqls, job.sql)

offsetSQL := w.checkPoint.GenSQL(job.file, job.offset)
Expand Down Expand Up @@ -547,11 +549,41 @@ func (l *Loader) closeFileJobQueue() {
l.fileJobQueueClosed.Set(true)
}

// align with https://github.com/pingcap/dumpling/pull/140
// if input is malformed, return original string and print log
func unescapePercent(input string, logger log.Logger) string {
buf := bytes.Buffer{}
buf.Grow(len(input))
i := 0
for i < len(input) {
if input[i] != '%' {
buf.WriteByte(input[i])
i++
} else {
if i+2 >= len(input) {
logger.Error("malformed filename while unescapePercent", zap.String("filename", input))
return input
}
ascii, err := hex.DecodeString(input[i+1 : i+3])
if err != nil {
logger.Error("malformed filename while unescapePercent", zap.String("filename", input))
return input
}
buf.Write(ascii)
i = i + 3
}
}
return buf.String()
}

func (l *Loader) skipSchemaAndTable(table *filter.Table) bool {
if filter.IsSystemSchema(table.Schema) {
return true
}

table.Schema = unescapePercent(table.Schema, l.logCtx.L())
table.Name = unescapePercent(table.Name, l.logCtx.L())

tbs := []*filter.Table{table}
tbs = l.baList.ApplyOn(tbs)
return len(tbs) == 0
Expand Down Expand Up @@ -1060,7 +1092,7 @@ func (l *Loader) restoreStructure(ctx context.Context, conn *DBConn, sqlFile str
dstSchema, dstTable := fetchMatchedLiteral(tctx, l.tableRouter, schema, table)
// for table
if table != "" {
sqls = append(sqls, fmt.Sprintf("USE `%s`;", dstSchema))
sqls = append(sqls, fmt.Sprintf("USE `%s`;", unescapePercent(dstSchema, l.logCtx.L())))
query = renameShardingTable(query, table, dstTable)
} else {
query = renameShardingSchema(query, schema, dstSchema)
Expand Down
2 changes: 1 addition & 1 deletion tests/_utils/test_prepare
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ function cleanup_data() {
rm -rf $WORK_DIR
mkdir $WORK_DIR
for target_db in "$@"; do
run_sql "drop database if exists ${target_db}" $TIDB_PORT $TIDB_PASSWORD
run_sql "drop database if exists \`${target_db}\`" $TIDB_PORT $TIDB_PASSWORD
done
run_sql "drop database if exists dm_meta" $TIDB_PORT $TIDB_PASSWORD
}
Expand Down
48 changes: 48 additions & 0 deletions tests/full_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,54 @@ function fail_acquire_global_lock() {
cleanup_process $*
}

function escape_schema() {
cp $cur/data/db1.prepare.sql $WORK_DIR/db1.prepare.sql
cp $cur/data/db2.prepare.sql $WORK_DIR/db2.prepare.sql
cp $cur/conf/dm-task.yaml $WORK_DIR/dm-task.yaml
cp $cur/conf/diff_config.toml $WORK_DIR/diff_config.toml
sed -i "s/full_mode/full\/mode/g" $WORK_DIR/db1.prepare.sql $WORK_DIR/db2.prepare.sql $WORK_DIR/dm-task.yaml $WORK_DIR/diff_config.toml

run_sql_file $WORK_DIR/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
check_contains 'Query OK, 2 rows affected'
run_sql_file $WORK_DIR/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
check_contains 'Query OK, 3 rows affected'

# test load data with `/` in the table name
run_sql_source1 "create table \`full/mode\`.\`tb\/1\` (id int, name varchar(10), primary key(\`id\`));"
run_sql_source1 "insert into \`full/mode\`.\`tb\/1\` values(1,'haha');"
run_sql_source1 "insert into \`full/mode\`.\`tb\/1\` values(2,'hihi');"

run_sql_file $cur/data/db1.prepare.user.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
check_count 'Query OK, 0 rows affected' 7
run_sql_file $cur/data/db2.prepare.user.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
check_count 'Query OK, 0 rows affected' 7

run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT
# operate mysql config to worker
cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml
cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1
dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2

# start DM task only
dmctl_start_task "$WORK_DIR/dm-task.yaml" "--remove-meta"
check_sync_diff $WORK_DIR $WORK_DIR/diff_config.toml

echo "check dump files have been cleaned"
ls $WORK_DIR/worker1/dumped_data.test && exit 1 || echo "worker1 auto removed dump files"
ls $WORK_DIR/worker2/dumped_data.test && exit 1 || echo "worker2 auto removed dump files"

cleanup_data full/mode
cleanup_process $*
}

function run() {
fail_acquire_global_lock

Expand Down