Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/release-6.5' into cherry-pick-…
Browse files Browse the repository at this point in the history
…48687-to-release-6.5
  • Loading branch information
lance6716 committed Nov 25, 2024
2 parents 56f07d6 + ae4703a commit 409958b
Show file tree
Hide file tree
Showing 48 changed files with 1,121 additions and 330 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -3603,8 +3603,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:dsB3wb0b7Fs4fhoPutSEW2GgQqPRsYt2lXIP9eRXUVs=",
version = "v2.0.4-0.20240827021516-18287765af05",
sum = "h1:lKLA4jW6wj/A15+sb901WXvGd4xvdGuGDOndtyVTV/8=",
version = "v2.0.4-0.20240910032334-87841020c53e",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/aws/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ go_library(
deps = [
"//br/pkg/config",
"//br/pkg/glue",
"//br/pkg/logutil",
"//br/pkg/utils",
"@com_github_aws_aws_sdk_go//aws",
"@com_github_aws_aws_sdk_go//aws/awserr",
"@com_github_aws_aws_sdk_go//aws/client",
"@com_github_aws_aws_sdk_go//aws/request",
"@com_github_aws_aws_sdk_go//aws/session",
"@com_github_aws_aws_sdk_go//service/cloudwatch",
"@com_github_aws_aws_sdk_go//service/ec2",
Expand Down
47 changes: 47 additions & 0 deletions br/pkg/aws/ebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/client"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/ec2"
Expand All @@ -19,6 +21,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/config"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/utils"
"go.uber.org/atomic"
"go.uber.org/zap"
Expand All @@ -40,11 +43,55 @@ type EC2Session struct {

type VolumeAZs map[string]string

type ebsBackupRetryer struct {
delegate request.Retryer
}

func (e *ebsBackupRetryer) MaxRetries() int {
return e.delegate.MaxRetries()
}

var backOffTimeOverride = map[string]time.Duration{
// From the SDK:
// Sadly it seems there isn't an exported operation name...
// const opCreateSnapshots = "CreateSnapshots"
// The quota for create snapshots is 5 per minute.
// Back off for a longer time so we won't excced it.
"CreateSnapshots": 20 * time.Second,
// const opCreateVolume = "CreateVolume"
"CreateVolume": 20 * time.Second,
}

func (e *ebsBackupRetryer) RetryRules(r *request.Request) time.Duration {
backOff := e.delegate.RetryRules(r)
if override, ok := backOffTimeOverride[r.Operation.Name]; ok {
if override > backOff {
backOff = override
}
}
log.Warn(
"Retrying an operation.",
logutil.ShortError(r.Error),
zap.Duration("backoff", backOff),
zap.StackSkip("stack", 1),
)
return backOff
}

func (e *ebsBackupRetryer) ShouldRetry(r *request.Request) bool {
return e.delegate.ShouldRetry(r)
}

func NewEC2Session(concurrency uint, region string) (*EC2Session, error) {
// aws-sdk has builtin exponential backoff retry mechanism, see:
// https://github.com/aws/aws-sdk-go/blob/db4388e8b9b19d34dcde76c492b17607cd5651e2/aws/client/default_retryer.go#L12-L16
// with default retryer & max-retry=9, we will wait for at least 30s in total
awsConfig := aws.NewConfig().WithMaxRetries(9).WithRegion(region)
defRetry := new(client.DefaultRetryer)
ourRetry := ebsBackupRetryer{
delegate: defRetry,
}
awsConfig.Retryer = ourRetry
// TiDB Operator need make sure we have the correct permission to call aws api(through aws env variables)
// we may change this behaviour in the future.
sessionOptions := session.Options{Config: *awsConfig}
Expand Down
1 change: 1 addition & 0 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1277,6 +1277,7 @@ func (trecv *timeoutRecv) Refresh() {
func (trecv *timeoutRecv) Stop() {
close(trecv.refresh)
trecv.wg.Wait()
trecv.cancel()
}

var TimeoutOneResponse = time.Hour
Expand Down
10 changes: 10 additions & 0 deletions br/pkg/backup/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,13 @@ func TestTimeoutRecvCancel(t *testing.T) {
cancel()
trecv.wg.Wait()
}

func TestTimeoutRecvCanceled(t *testing.T) {
ctx := context.Background()
cctx, cancel := context.WithCancel(ctx)
defer cancel()

tctx, trecv := StartTimeoutRecv(cctx, time.Hour)
trecv.Stop()
require.Equal(t, "context canceled", tctx.Err().Error())
}
1 change: 1 addition & 0 deletions br/pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,7 @@ func (importer *FileImporter) downloadSST(
rule := import_sstpb.RewriteRule{
OldKeyPrefix: encodeKeyPrefix(fileRule.GetOldKeyPrefix()),
NewKeyPrefix: encodeKeyPrefix(fileRule.GetNewKeyPrefix()),
NewTimestamp: fileRule.NewTimestamp,
}
sstMeta := GetSSTMetaFromFile(id, file, regionInfo.Region, &rule)

Expand Down
114 changes: 114 additions & 0 deletions ddl/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"strings"
"sync/atomic"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -142,6 +143,119 @@ func newDefaultCallBack(do DomainReloader) Callback {

// ****************************** End of Default DDL Callback Instance *********************************************

// ****************************** Start of Test DDL Callback Instance ***********************************************

// TestDDLCallback is used to customize user callback themselves.
type TestDDLCallback struct {
*BaseCallback
// We recommended to pass the domain parameter to the test ddl callback, it will ensure
// domain to reload schema before your ddl stepping into the next state change.
Do DomainReloader

onJobRunBefore func(*model.Job)
OnJobRunBeforeExported func(*model.Job)
onJobUpdated func(*model.Job)
OnJobUpdatedExported atomic.Pointer[func(*model.Job)]
onWatched func(ctx context.Context)
OnGetJobBeforeExported func(string)
OnGetJobAfterExported func(string, *model.Job)
OnJobSchemaStateChanged func(int64)
}

// OnChanged mock the same behavior with the main DDL hook.
func (tc *TestDDLCallback) OnChanged(err error) error {
if err != nil {
return err
}
logutil.BgLogger().Info("performing DDL change, must reload")
if tc.Do != nil {
err = tc.Do.Reload()
if err != nil {
logutil.BgLogger().Error("performing DDL change failed", zap.Error(err))
}
}
return nil
}

// OnSchemaStateChanged mock the same behavior with the main ddl hook.
func (tc *TestDDLCallback) OnSchemaStateChanged(schemaVer int64) {
if tc.Do != nil {
if err := tc.Do.Reload(); err != nil {
logutil.BgLogger().Warn("reload failed on schema state changed", zap.Error(err))
}
}

if tc.OnJobSchemaStateChanged != nil {
tc.OnJobSchemaStateChanged(schemaVer)
return
}
}

// OnJobRunBefore is used to run the user customized logic of `onJobRunBefore` first.
func (tc *TestDDLCallback) OnJobRunBefore(job *model.Job) {
logutil.BgLogger().Info("on job run before", zap.String("job", job.String()))
if tc.OnJobRunBeforeExported != nil {
tc.OnJobRunBeforeExported(job)
return
}
if tc.onJobRunBefore != nil {
tc.onJobRunBefore(job)
return
}

tc.BaseCallback.OnJobRunBefore(job)
}

// OnJobUpdated is used to run the user customized logic of `OnJobUpdated` first.
func (tc *TestDDLCallback) OnJobUpdated(job *model.Job) {
logutil.BgLogger().Info("on job updated", zap.String("job", job.String()))
if onJobUpdatedExportedFunc := tc.OnJobUpdatedExported.Load(); onJobUpdatedExportedFunc != nil {
(*onJobUpdatedExportedFunc)(job)
return
}
if tc.onJobUpdated != nil {
tc.onJobUpdated(job)
return
}

tc.BaseCallback.OnJobUpdated(job)
}

// OnWatched is used to run the user customized logic of `OnWatched` first.
func (tc *TestDDLCallback) OnWatched(ctx context.Context) {
if tc.onWatched != nil {
tc.onWatched(ctx)
return
}

tc.BaseCallback.OnWatched(ctx)
}

// OnGetJobBefore implements Callback.OnGetJobBefore interface.
func (tc *TestDDLCallback) OnGetJobBefore(jobType string) {
if tc.OnGetJobBeforeExported != nil {
tc.OnGetJobBeforeExported(jobType)
return
}
tc.BaseCallback.OnGetJobBefore(jobType)
}

// OnGetJobAfter implements Callback.OnGetJobAfter interface.
func (tc *TestDDLCallback) OnGetJobAfter(jobType string, job *model.Job) {
if tc.OnGetJobAfterExported != nil {
tc.OnGetJobAfterExported(jobType, job)
return
}
tc.BaseCallback.OnGetJobAfter(jobType, job)
}

// Clone copies the callback and take its reference
func (tc *TestDDLCallback) Clone() *TestDDLCallback {
return &*tc
}

// ****************************** End of Test DDL Callback Instance ***********************************************

// ****************************** Start of CTC DDL Callback Instance ***********************************************

// ctcCallback is the customized callback that TiDB will use.
Expand Down
113 changes: 0 additions & 113 deletions ddl/callback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,11 @@ package ddl

import (
"context"
"sync/atomic"
"testing"

"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/logutil"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

type TestInterceptor struct {
Expand All @@ -41,115 +37,6 @@ func (ti *TestInterceptor) OnGetInfoSchema(ctx sessionctx.Context, is infoschema
return ti.BaseInterceptor.OnGetInfoSchema(ctx, is)
}

// TestDDLCallback is used to customize user callback themselves.
type TestDDLCallback struct {
*BaseCallback
// We recommended to pass the domain parameter to the test ddl callback, it will ensure
// domain to reload schema before your ddl stepping into the next state change.
Do DomainReloader

onJobRunBefore func(*model.Job)
OnJobRunBeforeExported func(*model.Job)
onJobUpdated func(*model.Job)
OnJobUpdatedExported atomic.Pointer[func(*model.Job)]
onWatched func(ctx context.Context)
OnGetJobBeforeExported func(string)
OnGetJobAfterExported func(string, *model.Job)
OnJobSchemaStateChanged func(int64)
}

// OnChanged mock the same behavior with the main DDL hook.
func (tc *TestDDLCallback) OnChanged(err error) error {
if err != nil {
return err
}
logutil.BgLogger().Info("performing DDL change, must reload")
if tc.Do != nil {
err = tc.Do.Reload()
if err != nil {
logutil.BgLogger().Error("performing DDL change failed", zap.Error(err))
}
}
return nil
}

// OnSchemaStateChanged mock the same behavior with the main ddl hook.
func (tc *TestDDLCallback) OnSchemaStateChanged(schemaVer int64) {
if tc.Do != nil {
if err := tc.Do.Reload(); err != nil {
logutil.BgLogger().Warn("reload failed on schema state changed", zap.Error(err))
}
}

if tc.OnJobSchemaStateChanged != nil {
tc.OnJobSchemaStateChanged(schemaVer)
return
}
}

// OnJobRunBefore is used to run the user customized logic of `onJobRunBefore` first.
func (tc *TestDDLCallback) OnJobRunBefore(job *model.Job) {
logutil.BgLogger().Info("on job run before", zap.String("job", job.String()))
if tc.OnJobRunBeforeExported != nil {
tc.OnJobRunBeforeExported(job)
return
}
if tc.onJobRunBefore != nil {
tc.onJobRunBefore(job)
return
}

tc.BaseCallback.OnJobRunBefore(job)
}

// OnJobUpdated is used to run the user customized logic of `OnJobUpdated` first.
func (tc *TestDDLCallback) OnJobUpdated(job *model.Job) {
logutil.BgLogger().Info("on job updated", zap.String("job", job.String()))
if onJobUpdatedExportedFunc := tc.OnJobUpdatedExported.Load(); onJobUpdatedExportedFunc != nil {
(*onJobUpdatedExportedFunc)(job)
return
}
if tc.onJobUpdated != nil {
tc.onJobUpdated(job)
return
}

tc.BaseCallback.OnJobUpdated(job)
}

// OnWatched is used to run the user customized logic of `OnWatched` first.
func (tc *TestDDLCallback) OnWatched(ctx context.Context) {
if tc.onWatched != nil {
tc.onWatched(ctx)
return
}

tc.BaseCallback.OnWatched(ctx)
}

// OnGetJobBefore implements Callback.OnGetJobBefore interface.
func (tc *TestDDLCallback) OnGetJobBefore(jobType string) {
if tc.OnGetJobBeforeExported != nil {
tc.OnGetJobBeforeExported(jobType)
return
}
tc.BaseCallback.OnGetJobBefore(jobType)
}

// OnGetJobAfter implements Callback.OnGetJobAfter interface.
func (tc *TestDDLCallback) OnGetJobAfter(jobType string, job *model.Job) {
if tc.OnGetJobAfterExported != nil {
tc.OnGetJobAfterExported(jobType, job)
return
}
tc.BaseCallback.OnGetJobAfter(jobType, job)
}

// Clone copies the callback and take its reference
func (tc *TestDDLCallback) Clone() *TestDDLCallback {
return &*tc
}

func TestCallback(t *testing.T) {
cb := &BaseCallback{}
require.Nil(t, cb.OnChanged(nil))
Expand Down
Loading

0 comments on commit 409958b

Please sign in to comment.