Skip to content

Commit

Permalink
br: Add pre-check of duplicate table in the downstream (#55044) (#59583)
Browse files Browse the repository at this point in the history
close #55087
  • Loading branch information
ti-chi-bot authored Feb 18, 2025
1 parent 9752961 commit cd63397
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 0 deletions.
1 change: 1 addition & 0 deletions br/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ var (
ErrRestoreIncompatibleSys = errors.Normalize("incompatible system table", errors.RFCCodeText("BR:Restore:ErrRestoreIncompatibleSys"))
ErrUnsupportedSystemTable = errors.Normalize("the system table isn't supported for restoring yet", errors.RFCCodeText("BR:Restore:ErrUnsupportedSysTable"))
ErrDatabasesAlreadyExisted = errors.Normalize("databases already existed in restored cluster", errors.RFCCodeText("BR:Restore:ErrDatabasesAlreadyExisted"))
ErrTablesAlreadyExisted = errors.Normalize("tables already existed in restored cluster", errors.RFCCodeText("BR:Restore:ErrTablesAlreadyExisted"))

// ErrStreamLogTaskExist is the error when stream log task already exists, because of supporting single task currently.
ErrStreamLogTaskExist = errors.Normalize("stream task already exists", errors.RFCCodeText("BR:Stream:ErrStreamLogTaskExist"))
Expand Down
10 changes: 10 additions & 0 deletions br/pkg/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ import (
pd "github.com/tikv/pd/client"
)

type GlueClient int

const (
ClientCLP GlueClient = iota
ClientSql
)

// Glue is an abstraction of TiDB function calls used in BR.
type Glue interface {
GetDomain(store kv.Storage) (*domain.Domain, error)
Expand All @@ -36,6 +43,9 @@ type Glue interface {
// we can close domain as soon as possible.
// and we must reuse the exists session and don't close it in SQL backup job.
UseOneShotSession(store kv.Storage, closeDomain bool, fn func(se Session) error) error

// GetClient returns the client type of the glue
GetClient() GlueClient
}

// Session is an abstraction of the session.Session interface.
Expand Down
8 changes: 8 additions & 0 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ func (g Glue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(glue
return nil
}

func (Glue) GetClient() glue.GlueClient {
return glue.ClientCLP
}

// GetSessionCtx implements glue.Glue
func (gs *tidbSession) GetSessionCtx() sessionctx.Context {
return gs.se
Expand Down Expand Up @@ -349,3 +353,7 @@ func (m *MockGlue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func
}
return fn(glueSession)
}

func (m *MockGlue) GetClient() glue.GlueClient {
return glue.ClientCLP
}
4 changes: 4 additions & 0 deletions br/pkg/gluetikv/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,7 @@ func (Glue) GetVersion() string {
func (g Glue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(glue.Session) error) error {
return nil
}

func (Glue) GetClient() glue.GlueClient {
return glue.ClientCLP
}
1 change: 1 addition & 0 deletions br/pkg/task/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ go_library(
"//br/pkg/version",
"//config",
"//ddl",
"//infoschema",
"//kv",
"//parser/model",
"//parser/mysql",
Expand Down
29 changes: 29 additions & 0 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package task

import (
"context"
"fmt"
"strings"
"time"

Expand All @@ -26,6 +27,7 @@ import (
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/mathutil"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -137,6 +139,7 @@ func DefineRestoreCommonFlags(flags *pflag.FlagSet) {
_ = flags.MarkHidden(FlagPDConcurrency)
_ = flags.MarkHidden(FlagBatchFlushInterval)
_ = flags.MarkHidden(FlagDdlBatchSize)
_ = flags.MarkHidden(flagUseFSR)
}

// ParseFromFlags parses the config from the flag set.
Expand Down Expand Up @@ -632,6 +635,10 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
if err = client.CheckSysTableCompatibility(mgr.GetDomain(), tables); err != nil {
return errors.Trace(err)
}
} else if !client.IsIncremental() && cfg.CheckRequirements {
if err = checkTableExistence(ctx, mgr, tables, g); err != nil {
return errors.Trace(err)
}
}

sp := utils.BRServiceSafePoint{
Expand Down Expand Up @@ -851,6 +858,28 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
return nil
}

func checkTableExistence(ctx context.Context, mgr *conn.Mgr, tables []*metautil.Table, g glue.Glue) error {
// Tasks from br clp client use other checks to validate
if g.GetClient() != glue.ClientSql {
return nil
}
message := "table already exists: "
allUnique := true
for _, table := range tables {
_, err := mgr.GetDomain().InfoSchema().TableByName(table.DB.Name, table.Info.Name)
if err == nil {
message += fmt.Sprintf("%s.%s ", table.DB.Name, table.Info.Name)
allUnique = false
} else if !infoschema.ErrTableNotExists.Equal(err) {
return errors.Trace(err)
}
}
if !allUnique {
return errors.Annotate(berrors.ErrTablesAlreadyExisted, message)
}
return nil
}

// dropToBlackhole drop all incoming tables into black hole,
// i.e. don't execute checksum, just increase the process anyhow.
func dropToBlackhole(
Expand Down
2 changes: 2 additions & 0 deletions br/tests/br_incompatible_tidb_config/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,15 @@ run_sql "drop schema $DB;"
# restore with ddl(create table) job one by one
run_br --pd $PD_ADDR restore db --db "$DB" -s "local://$TEST_DIR/$DB$TABLE" --ddl-batch-size=1

run_sql "drop schema $DB;"
run_br --pd $PD_ADDR restore db --db "$DB" -s "local://$TEST_DIR/$DB$INCREMENTAL_TABLE" --ddl-batch-size=1

# restore
run_sql "drop schema $DB;"
# restore with batch create table
run_br --pd $PD_ADDR restore db --db "$DB" -s "local://$TEST_DIR/$DB$TABLE" --ddl-batch-size=128

run_sql "drop schema $DB;"
run_br --pd $PD_ADDR restore db --db "$DB" -s "local://$TEST_DIR/$DB$INCREMENTAL_TABLE" --ddl-batch-size=128

run_sql "drop schema $DB;"
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,11 @@ error = '''
failed to write and ingest
'''

["BR:Restore:ErrTablesAlreadyExisted"]
error = '''
tables already existed in restored cluster
'''

["BR:Restore:ErrUnsupportedSysTable"]
error = '''
the system table isn't supported for restoring yet
Expand Down
4 changes: 4 additions & 0 deletions executor/brie.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,10 @@ func (gs *tidbGlue) UseOneShotSession(_ kv.Storage, _ bool, fn func(se glue.Sess
return fn(glueSession)
}

func (*tidbGlue) GetClient() glue.GlueClient {
return glue.ClientSql
}

type tidbGlueSession struct {
// the session context of the brie task's subtask, such as `CREATE TABLE`.
se sessionctx.Context
Expand Down

0 comments on commit cd63397

Please sign in to comment.