-
Zookeeper应用场景
-
数据发布与订阅:发布订阅模型,就是发布者将数据发布到ZK节点上,供订阅者动态获取数据,实现数据的集中管理和动态更新。
-
配置中心:在应用中,将全局的配置信息放到ZK上集中管理,在应用启动的时候主动获取一次配置。同时,在节点上注册一个watcher,保证每次数据更新时会通知订阅者更新配置信息。
-
元数据维护:在分布式搜索服务中,索引的元信息和服务器集群机器的节点状态保存在ZK的指定节点上,供各个客户端使用。
-
分布式日志收集系统:这个系统的核心工作是收集不同服务器的日志。收集器通常是按照应用来分配任务单元,因此在ZK上用应用名创建一个节点,将需要收集的服务器的IP注册到这个节点的子结点上。
-
-
命名服务:客户端应用能够根据指定名字来获取资源或服务的地址、提供者等信息。
-
分布式通知/协调:ZK中特有的watcher注册与异步通知机制,能够很好的实现分布式环境下不同系统之间的通知与协调,实现对数据变更的实时处理。使用方法通常是:不同系统对ZK上的同一个节点进行注册,监听节点的变化。任意一个系统对节点进行了更新,其它系统都能接到通知,并进行相应处理。
-
任务汇报:类似于任务分发系统。子任务启动后,在ZK上注册一个临时节点,并定时将自己的进度写入这个节点,这样任务管理者可以实时查看任务进度。
-
服务器列表维护:能都动态监听服务器的上下线
-
服务端代码
-
-
package zookeeper; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; public class Server { private static final String CONNECT_STRING = "172.17.23.79:2181,172.17.23.79:2182"; private static final int TIME_OUT = 15000; private ZooKeeper zooKeeper; public Server() throws IOException { this.zooKeeper = new ZooKeeper(CONNECT_STRING, TIME_OUT, watchedEvent -> { }); } /** * 创建临时序列节点(服务器关闭时节点会自动删除) * @param serverName * @return * @throws Exception */ public String regsterServer(String serverName) throws Exception { String result = zooKeeper.create("/clusterServer/server", serverName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(serverName+" server start...."); return result; } public static void main(String[] args) throws Exception { Server server = new Server(); server.regsterServer(args[0]); //保持程序运行,防止程序停止运行导致节点自动删除 while (true) { } } }
-
-
- 客户端代码
-
package zookeeper; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; import java.util.ArrayList; import java.util.List; /** * 获取并监听注册的服务器列表 */ public class Client { private static final String CONNECT_STRING = "172.17.23.79:2181,172.17.23.79:2182"; private static final int TIME_OUT = 15000; private ZooKeeper zooKeeper; public Client() throws IOException { this.zooKeeper = new ZooKeeper(CONNECT_STRING, TIME_OUT, watchedEvent -> { System.out.println("watcher works."); try { getServers(); } catch (Exception e) { e.printStackTrace(); } }); } public void getServers() throws Exception { List<String> servers = new ArrayList<>(); List<String> children = zooKeeper.getChildren("/clusterServer", true, null); for (String child : children) { String server = getData("/clusterServer/"+child); servers.add(server); } System.out.println(servers); } public String getData(String path) throws Exception { byte[] data = zooKeeper.getData(path, true, null); return new String(data); } public static void main(String[] args) throws Exception { Client client = new Client(); client.getServers(); while (true) { } } }
-
-
分布式锁:
-
保持独占:通常的做法是把ZK的节点看作一把锁,通过create node的方式来实现。
-
-
具体的实现方式是:
-
-
-
方法一:所有的客户端都去创建/lock节点,最终创建成功的持有这把锁。(同一个节点只能创建一次,再次创建会返回失败信息)
-
-
/** * 通过创建临时节点,实现服务器之间的独占锁 */ @Test public void singleLock() { try { //参数:1,节点路径; 2,要存储的数据; 3,节点的权限; 4,节点的类型 String nodePath = zooKeeper.create("/lock", "This is Lock.".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); //创建成功,则相当于拥有独占锁,可以进行以下逻辑 //TODO 业务逻辑 System.out.println(nodePath); //业务逻辑结束后,删除节点,即释放锁资源 zooKeeper.delete("/lock", -1); } catch (Exception e) { //创建节点失败,重新调用,直至创建成功 if (e instanceof KeeperException && "NODEEXISTS".equals(((KeeperException)e).code().name())) { System.out.println("Node exists."); singleLock(); }else { e.printStackTrace(); } } }
-
-
-
-
方法二:判断/lock节点是否存在,如果存在,说明其它服务器已经持有锁资源;如果不存在,则锁资源处于空闲状态,创建节点占有锁资源。
-
-
-
/** * 通过创建临时节点,实现服务器之间的独占锁 */ @Test public void singleLock2() throws KeeperException, InterruptedException { Stat stat = zooKeeper.exists("/lock", false); //如果节点已经存在,等待其它服务器删除节点。即:等待其它服务器释放锁资源 while(stat != null) { } //参数:1,节点路径; 2,要存储的数据; 3,节点的权限; 4,节点的类型 String nodePath = zooKeeper.create("/lock", "This is Lock.".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); //创建成功,则相当于拥有独占锁,可以进行以下逻辑 //TODO 业务逻辑 System.out.println(nodePath); //业务逻辑结束后,删除节点,即释放锁资源 zooKeeper.delete("/lock", -1); }
-
-
控制时序:/lock节点已存在,客户端在它下面创建临时有序节点/lock/{sessionId}-1 , /lock/{sessionId}-2 , /lock/{sessionId}-3 …..(通过节点属性控制 CreateMode.EPHEMERAL_SEQUENTIAL控制),保证子节点创建的有序性,从而保证的客户端的时序性。
-
方式一:获取锁资源时,程序处于block状态,直到获取锁资源。
-
-
/** * 通过创建临时时序节点,实现服务器之间的时序锁 */ @Test public void lock() throws KeeperException, InterruptedException { //创建临时时序节点 String nodePath = zooKeeper.create("/lock/sublock", "This is sub lock.".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); while(true) { List<String> children = zooKeeper.getChildren("/lock", false); //排序,并获取序号最小的节点。(序号越小,表明请求时间越早,优先获取锁资源) children.sort(String::compareTo); if (nodePath.equals("/lock/"+children.get(0))){ //TODO 业务逻辑 System.out.println("TODO Logic."); break; } } //业务逻辑结束后,删除节点,即释放锁资源 zooKeeper.delete(nodePath, -1); }
-
-
- 方式二:通过watcher监听服务器列表变化,判断当前服务器是否获取锁资源,程序不会block
-
package zookeeper; import org.apache.zookeeper.*; import java.io.IOException; import java.util.Collections; import java.util.List; /** * * 通过Zookeeper实现服务器之间的时序锁 * */ public class SeqLock { private static final String CONNECT_STRING = "172.17.23.79:2181,172.17.23.79:2182"; private static ZooKeeper zooKeeper; private String thispath; public SeqLock() throws IOException { //超时时间单位:毫秒 zooKeeper = new ZooKeeper(CONNECT_STRING, 15000, event -> { //监听“/lock”的子节点变化。如果有服务器释放锁,判断自己是否获取锁 if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged && event.getPath().startsWith("/lock")) { try { List<String> children = zooKeeper.getChildren("/lock", false); if (children.size() > 0) { Collections.sort(children); String fistNode = "/lock/"+children.get(0); if (fistNode.equals(thispath)){ doSomethingAndDelNode(); } } } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } } }); } /** * 通过创建临时时序节点注册时序锁,并监听服务器列表 */ public void lock() throws KeeperException, InterruptedException { thispath = zooKeeper.create("/lock/sublock", "This is sub lock.".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(thispath); List<String> children = zooKeeper.getChildren("/lock", false); //如果只有一个子节点,说明锁资源被当前服务器持有。如果子节点不止一个,说明锁资源已经被其它服务器持有 if(children.size() == 1) { doSomethingAndDelNode(); } } private void doSomethingAndDelNode() throws InterruptedException, KeeperException { //TODO 业务逻辑 System.out.println("TODO Logic."); //业务逻辑结束后,删除节点,即释放锁资源 zooKeeper.delete(thispath, -1); } public static void main(String[] args) { try { SeqLock lock = new SeqLock(); lock.lock(); } catch (Exception e) { e.printStackTrace(); } } }
参考文献:
-
ZooKeeper典型应用场景
链接 :https://pan.baidu.com/s/1lohZ98K33z_Hf_8Y370Nuw 提取码:kydw
出处:https://www.cnblogs.com/BlueStarWei/p/15211010.html
最新更新
python爬虫及其可视化
使用python爬取豆瓣电影短评评论内容
nodejs爬虫
Python正则表达式完全指南
爬取豆瓣Top250图书数据
shp 地图文件批量添加字段
爬虫小试牛刀(爬取学校通知公告)
【python基础】函数-初识函数
【python基础】函数-返回值
HTTP请求:requests模块基础使用必知必会
SQL SERVER中递归
2个场景实例讲解GaussDB(DWS)基表统计信息估
常用的 SQL Server 关键字及其含义
动手分析SQL Server中的事务中使用的锁
openGauss内核分析:SQL by pass & 经典执行
一招教你如何高效批量导入与更新数据
天天写SQL,这些神奇的特性你知道吗?
openGauss内核分析:执行计划生成
[IM002]Navicat ODBC驱动器管理器 未发现数据
初入Sql Server 之 存储过程的简单使用
uniapp/H5 获取手机桌面壁纸 (静态壁纸)
[前端] DNS解析与优化
为什么在js中需要添加addEventListener()?
JS模块化系统
js通过Object.defineProperty() 定义和控制对象
这是目前我见过最好的跨域解决方案!
减少回流与重绘
减少回流与重绘
如何使用KrpanoToolJS在浏览器切图
performance.now() 与 Date.now() 对比