Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#56362
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
fishiu authored and ti-chi-bot committed Nov 5, 2024
1 parent 0627e65 commit ab74543
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 4 deletions.
4 changes: 4 additions & 0 deletions owner/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ go_test(
],
embed = [":owner"],
flaky = True,
<<<<<<< HEAD:owner/BUILD.bazel
shard_count = 6,
=======
shard_count = 9,
>>>>>>> afdd5c2ecd5 (owner: fix data race on ownerManager.campaignCancel (#56362)):pkg/owner/BUILD.bazel
deps = [
"//ddl",
"//infoschema",
Expand Down
8 changes: 4 additions & 4 deletions owner/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ func (m *ownerManager) CampaignOwner(withTTL ...int) error {
}
m.sessionLease.Store(int64(session.Lease()))
m.wg.Add(1)
go m.campaignLoop(session)
var campaignContext context.Context
campaignContext, m.campaignCancel = context.WithCancel(m.ctx)
go m.campaignLoop(campaignContext, session)
return nil
}

Expand Down Expand Up @@ -226,9 +228,7 @@ func (m *ownerManager) CampaignCancel() {
m.wg.Wait()
}

func (m *ownerManager) campaignLoop(etcdSession *concurrency.Session) {
var campaignContext context.Context
campaignContext, m.campaignCancel = context.WithCancel(m.ctx)
func (m *ownerManager) campaignLoop(campaignContext context.Context, etcdSession *concurrency.Session) {
defer func() {
m.campaignCancel()
if r := recover(); r != nil {
Expand Down
125 changes: 125 additions & 0 deletions owner/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,3 +379,128 @@ func deleteLeader(cli *clientv3.Client, prefixKey string) error {
_, err = cli.Delete(context.Background(), string(resp.Kvs[0].Key))
return errors.Trace(err)
}
<<<<<<< HEAD:owner/manager_test.go
=======

func TestImmediatelyCancel(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows")
}
integration.BeforeTestExternal(t)

tInfo := newTestInfo(t)
d := tInfo.ddl
defer tInfo.Close(t)
ownerManager := d.OwnerManager()
for i := 0; i < 10; i++ {
err := ownerManager.CampaignOwner()
require.NoError(t, err)
ownerManager.CampaignCancel()
}
}

func TestAcquireDistributedLock(t *testing.T) {
const addrFmt = "http://127.0.0.1:%d"
cfg := embed.NewConfig()
cfg.Dir = t.TempDir()
// rand port in [20000, 60000)
randPort := int(rand.Int31n(40000)) + 20000
clientAddr := fmt.Sprintf(addrFmt, randPort)
lcurl, _ := url.Parse(clientAddr)
cfg.ListenClientUrls, cfg.AdvertiseClientUrls = []url.URL{*lcurl}, []url.URL{*lcurl}
lpurl, _ := url.Parse(fmt.Sprintf(addrFmt, randPort+1))
cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = []url.URL{*lpurl}, []url.URL{*lpurl}
cfg.InitialCluster = "default=" + lpurl.String()
cfg.Logger = "zap"
embedEtcd, err := embed.StartEtcd(cfg)
require.NoError(t, err)
<-embedEtcd.Server.ReadyNotify()
t.Cleanup(func() {
embedEtcd.Close()
})
makeEtcdCli := func(t *testing.T) (cli *clientv3.Client) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{lcurl.String()},
})
require.NoError(t, err)
t.Cleanup(func() {
cli.Close()
})
return cli
}
t.Run("acquire distributed lock with same client", func(t *testing.T) {
cli := makeEtcdCli(t)
getLock := make(chan struct{})
ctx := context.Background()

release1, err := owner.AcquireDistributedLock(ctx, cli, "test-lock", 10)
require.NoError(t, err)
var wg util.WaitGroupWrapper
wg.Run(func() {
// Acquire another distributed lock will be blocked.
release2, err := owner.AcquireDistributedLock(ctx, cli, "test-lock", 10)
require.NoError(t, err)
getLock <- struct{}{}
release2()
})
timer := time.NewTimer(300 * time.Millisecond)
select {
case <-getLock:
require.FailNow(t, "acquired same lock unexpectedly")
case <-timer.C:
release1()
<-getLock
}
wg.Wait()

release1, err = owner.AcquireDistributedLock(ctx, cli, "test-lock/1", 10)
require.NoError(t, err)
release2, err := owner.AcquireDistributedLock(ctx, cli, "test-lock/2", 10)
require.NoError(t, err)
release1()
release2()
})

t.Run("acquire distributed lock with different clients", func(t *testing.T) {
cli1 := makeEtcdCli(t)
cli2 := makeEtcdCli(t)

getLock := make(chan struct{})
ctx := context.Background()

release1, err := owner.AcquireDistributedLock(ctx, cli1, "test-lock", 10)
require.NoError(t, err)
var wg util.WaitGroupWrapper
wg.Run(func() {
// Acquire another distributed lock will be blocked.
release2, err := owner.AcquireDistributedLock(ctx, cli2, "test-lock", 10)
require.NoError(t, err)
getLock <- struct{}{}
release2()
})
timer := time.NewTimer(300 * time.Millisecond)
select {
case <-getLock:
require.FailNow(t, "acquired same lock unexpectedly")
case <-timer.C:
release1()
<-getLock
}
wg.Wait()
})

t.Run("acquire distributed lock until timeout", func(t *testing.T) {
cli1 := makeEtcdCli(t)
cli2 := makeEtcdCli(t)
ctx := context.Background()

_, err := owner.AcquireDistributedLock(ctx, cli1, "test-lock", 1)
require.NoError(t, err)
cli1.Close() // Note that release() is not invoked.

release2, err := owner.AcquireDistributedLock(ctx, cli2, "test-lock", 10)
require.NoError(t, err)
release2()
})
}
>>>>>>> afdd5c2ecd5 (owner: fix data race on ownerManager.campaignCancel (#56362)):pkg/owner/manager_test.go

0 comments on commit ab74543

Please sign in to comment.