// Lock locks the mutex with a cancelable context. If the context is canceled // while trying to acquire the lock, the mutex tries to clean its stale lock entry. // Lock,即锁的竞争应该有事件限制,非必需无限等待的场景下应传入超时控制的上下文 func(m *Mutex) Lock(ctx context.Context) error { s := m.s client := m.s.Client()
// 将 LeaseId 作为唯一性的保证,拼接出一个具有相同前缀又全局唯一的key m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease()) cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0) // put self in lock waiters via myKey; oldest waiter holds lock put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease())) // reuse key in case this session already holds the lock get := v3.OpGet(m.myKey) // fetch current holder to complete uncontended path with only one RPC getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...) resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit() if err != nil { return err } m.myRev = resp.Header.Revision if !resp.Succeeded { m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision } // 锁的判断逻辑 // if no key on prefix / the minimum rev is key, already hold the lock ownerKey := resp.Responses[1].GetResponseRange().Kvs iflen(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev { m.hdr = resp.Header returnnil }
// wait for deletion revisions prior to myKey hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1) // release lock key if wait failed if werr != nil { m.Unlock(client.Ctx()) } else { m.hdr = hdr } return werr }