Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: watch the ddl ownerkey with the createRevision (#55692) #57041

Open
wants to merge 1 commit into
base: release-6.5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions owner/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ go_test(
],
embed = [":owner"],
flaky = True,
<<<<<<< HEAD:owner/BUILD.bazel
=======
shard_count = 7,
>>>>>>> 0720ea86949 (ddl: watch the ddl ownerkey with the createRevision (#55692)):pkg/owner/BUILD.bazel
deps = [
"//ddl",
"//infoschema",
Expand Down
149 changes: 139 additions & 10 deletions owner/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,13 +248,18 @@ func (m *ownerManager) campaignLoop(etcdSession *concurrency.Session) {
continue
}

<<<<<<< HEAD:owner/manager.go
ownerKey, err := GetOwnerInfo(campaignContext, logCtx, elec, m.id)
=======
ownerKey, currRev, err := GetOwnerKeyInfo(campaignContext, logCtx, m.etcdCli, m.key, m.id)
>>>>>>> 0720ea86949 (ddl: watch the ddl ownerkey with the createRevision (#55692)):pkg/owner/manager.go
if err != nil {
continue
}

m.toBeOwner(elec)
m.watchOwner(campaignContext, etcdSession, ownerKey)
err = m.watchOwner(campaignContext, etcdSession, ownerKey, currRev)
logutil.Logger(logCtx).Info("watch owner finished", zap.Error(err))
m.RetireOwner()

metrics.CampaignOwnerCounter.WithLabelValues(m.prompt, metrics.NoLongerOwner).Inc()
Expand All @@ -274,9 +279,56 @@ func (m *ownerManager) revokeSession(_ string, leaseID clientv3.LeaseID) {

// GetOwnerID implements Manager.GetOwnerID interface.
func (m *ownerManager) GetOwnerID(ctx context.Context) (string, error) {
<<<<<<< HEAD:owner/manager.go
resp, err := m.etcdCli.Get(ctx, m.key, clientv3.WithFirstCreate()...)
=======
_, ownerID, _, _, _, err := getOwnerInfo(ctx, m.logCtx, m.etcdCli, m.key)
return string(ownerID), errors.Trace(err)
}

func getOwnerInfo(ctx, logCtx context.Context, etcdCli *clientv3.Client, ownerPath string) (string, []byte, OpType, int64, int64, error) {
var op OpType
var resp *clientv3.GetResponse
var err error
for i := 0; i < 3; i++ {
if err = ctx.Err(); err != nil {
return "", nil, op, 0, 0, errors.Trace(err)
}

childCtx, cancel := context.WithTimeout(ctx, util.KeyOpDefaultTimeout)
resp, err = etcdCli.Get(childCtx, ownerPath, clientv3.WithFirstCreate()...)
cancel()
if err == nil {
break
}
logutil.Logger(logCtx).Info("etcd-cli get owner info failed", zap.String("key", ownerPath), zap.Int("retryCnt", i), zap.Error(err))
time.Sleep(util.KeyOpRetryInterval)
}
if err != nil {
return "", errors.Trace(err)
logutil.Logger(logCtx).Warn("etcd-cli get owner info failed", zap.Error(err))
return "", nil, op, 0, 0, errors.Trace(err)
}
if len(resp.Kvs) == 0 {
return "", nil, op, 0, 0, concurrency.ErrElectionNoLeader
}

var ownerID []byte
ownerID, op = splitOwnerValues(resp.Kvs[0].Value)
logutil.Logger(logCtx).Info("get owner", zap.ByteString("owner key", resp.Kvs[0].Key),
zap.ByteString("ownerID", ownerID), zap.Stringer("op", op))
return string(resp.Kvs[0].Key), ownerID, op, resp.Header.Revision, resp.Kvs[0].ModRevision, nil
}

// GetOwnerKeyInfo gets the owner key and current revision.
func GetOwnerKeyInfo(
ctx, logCtx context.Context,
etcdCli *clientv3.Client,
etcdKey, id string,
) (string, int64, error) {
ownerKey, ownerID, _, currRevision, _, err := getOwnerInfo(ctx, logCtx, etcdCli, etcdKey)
>>>>>>> 0720ea86949 (ddl: watch the ddl ownerkey with the createRevision (#55692)):pkg/owner/manager.go
if err != nil {
return "", 0, errors.Trace(err)
}
if len(resp.Kvs) == 0 {
return "", concurrency.ErrElectionNoLeader
Expand All @@ -296,44 +348,121 @@ func GetOwnerInfo(ctx, logCtx context.Context, elec *concurrency.Election, id st
logutil.Logger(logCtx).Info("get owner", zap.String("ownerID", ownerID))
if ownerID != id {
logutil.Logger(logCtx).Warn("is not the owner")
return "", errors.New("ownerInfoNotMatch")
return "", 0, errors.New("ownerInfoNotMatch")
}

<<<<<<< HEAD:owner/manager.go
return string(resp.Kvs[0].Key), nil
=======
return ownerKey, currRevision, nil
}

func splitOwnerValues(val []byte) ([]byte, OpType) {
vals := bytes.Split(val, []byte("_"))
var op OpType
if len(vals) == 2 {
op = OpType(vals[1][0])
}
return vals[0], op
}

func (m *ownerManager) watchOwner(ctx context.Context, etcdSession *concurrency.Session, key string) {
func joinOwnerValues(vals ...[]byte) []byte {
return bytes.Join(vals, []byte("_"))
}

// SetOwnerOpValue implements Manager.SetOwnerOpValue interface.
func (m *ownerManager) SetOwnerOpValue(ctx context.Context, op OpType) error {
// owner don't change.
ownerKey, ownerID, currOp, _, modRevision, err := getOwnerInfo(ctx, m.logCtx, m.etcdCli, m.key)
if err != nil {
return errors.Trace(err)
}
if currOp == op {
logutil.Logger(m.logCtx).Info("set owner op is the same as the original, so do nothing.", zap.Stringer("op", op))
return nil
}
if string(ownerID) != m.id {
return errors.New("ownerInfoNotMatch")
}
newOwnerVal := joinOwnerValues(ownerID, []byte{byte(op)})

failpoint.Inject("MockDelOwnerKey", func(v failpoint.Value) {
if valStr, ok := v.(string); ok {
if err := mockDelOwnerKey(valStr, ownerKey, m); err != nil {
failpoint.Return(err)
}
}
})

leaseOp := clientv3.WithLease(clientv3.LeaseID(m.sessionLease.Load()))
resp, err := m.etcdCli.Txn(ctx).
If(clientv3.Compare(clientv3.ModRevision(ownerKey), "=", modRevision)).
Then(clientv3.OpPut(ownerKey, string(newOwnerVal), leaseOp)).
Commit()
if err == nil && !resp.Succeeded {
err = errors.New("put owner key failed, cmp is false")
}
logutil.BgLogger().Info("set owner op value", zap.String("owner key", ownerKey), zap.ByteString("ownerID", ownerID),
zap.Stringer("old Op", currOp), zap.Stringer("op", op), zap.Error(err))
metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.PutValue+"_"+metrics.RetLabel(err)).Inc()
return errors.Trace(err)
}

// GetOwnerOpValue gets the owner op value.
func GetOwnerOpValue(ctx context.Context, etcdCli *clientv3.Client, ownerPath, logPrefix string) (OpType, error) {
// It's using for testing.
if etcdCli == nil {
return *mockOwnerOpValue.Load(), nil
}

logCtx := logutil.WithKeyValue(context.Background(), "owner info", logPrefix)
_, _, op, _, _, err := getOwnerInfo(ctx, logCtx, etcdCli, ownerPath)
return op, errors.Trace(err)
>>>>>>> 0720ea86949 (ddl: watch the ddl ownerkey with the createRevision (#55692)):pkg/owner/manager.go
}

// WatchOwnerForTest watches the ownerKey.
// This function is used to test watchOwner().
func WatchOwnerForTest(ctx context.Context, m Manager, etcdSession *concurrency.Session, key string, createRevison int64) error {
if ownerManager, ok := m.(*ownerManager); ok {
return ownerManager.watchOwner(ctx, etcdSession, key, createRevison)
}
return nil
}

func (m *ownerManager) watchOwner(ctx context.Context, etcdSession *concurrency.Session, key string, currRev int64) error {
logPrefix := fmt.Sprintf("[%s] ownerManager %s watch owner key %v", m.prompt, m.id, key)
logCtx := logutil.WithKeyValue(context.Background(), "owner info", logPrefix)
logutil.BgLogger().Debug(logPrefix)
watchCh := m.etcdCli.Watch(ctx, key)
// we need to watch the ownerKey since currRev + 1.
watchCh := m.etcdCli.Watch(ctx, key, clientv3.WithRev(currRev+1))
for {
select {
case resp, ok := <-watchCh:
if !ok {
metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.WatcherClosed).Inc()
logutil.Logger(logCtx).Info("watcher is closed, no owner")
return
return errors.Errorf("watcher is closed, key: %v", key)
}
if resp.Canceled {
metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.Cancelled).Inc()
logutil.Logger(logCtx).Info("watch canceled, no owner")
return
return errors.Errorf("watch canceled, key: %v", key)
}

for _, ev := range resp.Events {
if ev.Type == mvccpb.DELETE {
metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.Deleted).Inc()
logutil.Logger(logCtx).Info("watch failed, owner is deleted")
return
return nil
}
}
case <-etcdSession.Done():
metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.SessionDone).Inc()
return
return nil
case <-ctx.Done():
metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.CtxDone).Inc()
return
return nil
}
}
}
Expand Down
Loading
Loading