首页 > Python基础教程 >
-
ZooKeeper 实现分布式锁的方法示例
ZooKeeper 是一个典型的分布式数据一致性解决方案,分布式应用程序可以基于 ZooKeeper 实现诸如数据发布/订阅、负载均衡、分布式协调/通知、集群管理、Master 选举、分布式锁等功能。
节点
在介绍 ZooKeeper 分布式锁前需要先了解一下 ZooKeeper 中节点(Znode),ZooKeeper 的数据存储数据模型是一棵树(Znode Tree),由斜杠(/)的进行分割的路径,就是一个 Znode(如 /locks/my_lock)。每个 Znode 上都会保存自己的数据内容,同时还会保存一系列属性信息。
Znode 又分为以下四种类型:
类型 | 描述 |
---|---|
持久节点 | 节点创建后,会一直存在,不会因客户端会话失效而删除 |
持久顺序节点 | 基本特性与持久节点一致,创建节点的过程中,ZooKeeper 会在其名字后自动追加一个单调增长的数字后缀,作为新的节点名 |
临时节点 | 客户端会话失效或连接关闭后,该节点会被自动删除 |
临时顺序节点 | 基本特性与临时节点一致,创建节点的过程中,ZooKeeper 会在其名字后自动追加一个单调增长的数字后缀,作为新的节点名 |
锁原理
ZooKeeper 分布式锁是基于 临时顺序节点 来实现的,锁可理解为 ZooKeeper 上的一个节点,当需要获取锁时,就在这个锁节点下创建一个临时顺序节点。当存在多个客户端同时来获取锁,就按顺序依次创建多个临时顺序节点,但只有排列序号是第一的那个节点能获取锁成功,其他节点则按顺序分别监听前一个节点的变化,当被监听者释放锁时,监听者就可以马上获得锁。
而且用临时顺序节点的另外一个用意是如果某个客户端创建临时顺序节点后,自己意外宕机了也没关系,ZooKeeper 感知到某个客户端宕机后会自动删除对应的临时顺序节点,相当于自动释放锁。
如上图:ClientA 和 ClientB 同时想获取锁,所以都在 locks 节点下创建了一个临时节点 1 和 2,而 1 是当前 locks 节点下排列序号第一的节点,所以 ClientA 获取锁成功,而 ClientB 处于等待状态,这时 ZooKeeper 中的 2 节点会监听 1 节点,当 1节点锁释放(节点被删除)时,2 就变成了 locks 节点下排列序号第一的节点,这样 ClientB 就获取锁成功了。
代码测试
请确保 ZooKeeper 服务已启动,ZooKeeper 的搭建可参考Kafka 集群 中的 ZooKeeper 集群部分
以下是基于 C# 的测试,Java 可使用 Curator 框架,实现原理和上面描述是一致的,有兴趣可以看看源码,应该也不难理解。
创建 .NET Core 控制台程序 Nuget
安装 ZooKeeperNetEx.Recipes
创建 ZooKeeper Client
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
private const int CONNECTION_TIMEOUT = 50000; private const string CONNECTION_STRING = "127.0.0.1:2181" ; private ZooKeeper CreateClient() { var zooKeeper = new ZooKeeper(CONNECTION_STRING, CONNECTION_TIMEOUT, NullWatcher.Instance); Stopwatch sw = new Stopwatch(); sw.Start(); while (sw.ElapsedMilliseconds < CONNECTION_TIMEOUT) { var state = zooKeeper.getState(); if (state == ZooKeeper.States.CONNECTED || state == ZooKeeper.States.CONNECTING) { break ; } } sw.Stop(); return zooKeeper; } class NullWatcher : Watcher { public static readonly NullWatcher Instance = new NullWatcher(); private NullWatcher() { } public override Task process(WatchedEvent @ event ) { return Task.CompletedTask; } } |
添加 Lock 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
/// <summary> /// 加锁 /// </summary> /// <param name="key">加锁的节点名</param> /// <param name="lockAcquiredAction">加锁成功后需要执行的逻辑</param> /// <param name="lockReleasedAction">锁释放后需要执行的逻辑,可为空</param> /// <returns></returns> public async Task Lock( string key, Action lockAcquiredAction, Action lockReleasedAction = null ) { // 获取 ZooKeeper Client ZooKeeper keeper = CreateClient(); // 指定锁节点 WriteLock writeLock = new WriteLock(keeper, $ "/{key}" , null ); var lockCallback = new LockCallback(() => { lockAcquiredAction.Invoke(); writeLock.unlock(); }, lockReleasedAction); // 绑定锁获取和释放的监听对象 writeLock.setLockListener(lockCallback); // 获取锁(获取失败时会监听上一个临时节点) await writeLock.Lock(); } class LockCallback : LockListener { private readonly Action _lockAcquiredAction; private readonly Action _lockReleasedAction; public LockCallback(Action lockAcquiredAction, Action lockReleasedAction) { _lockAcquiredAction = lockAcquiredAction; _lockReleasedAction = lockReleasedAction; } /// <summary> /// 获取锁成功回调 /// </summary> /// <returns></returns> public Task lockAcquired() { _lockAcquiredAction?.Invoke(); return Task.FromResult(0); } /// <summary> /// 释放锁成功回调 /// </summary> /// <returns></returns> public Task lockReleased() { _lockReleasedAction?.Invoke(); return Task.FromResult(0); } } |
多线程模拟测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
static async Task RunAsync() { Parallel.For(1, 10, async (i) => { await new ZooKeeprDistributedLock().Lock( "locks" , () => { Console.WriteLine($ "第{i}个请求,获取锁成功:{DateTime.Now},线程Id:{Thread.CurrentThread.ManagedThreadId}" ); Thread.Sleep(1000); // 业务逻辑... }, () => { Console.WriteLine($ "第{i}个请求,释放锁成功:{DateTime.Now},线程Id:{Thread.CurrentThread.ManagedThreadId}" ); Console.WriteLine( "-------------------------------" ); }); }); await Task.CompletedTask; } |
虽然模拟的是多线程并行执行,但最终都会依赖锁的获取和释放而串行执行实际业务逻辑。