Curator的基本使用
一. 前言
官网 : Apache Curator
Curator是netflix公司开源的一套zookeeper客户端。与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。Curator解决了很多zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册wathcer和NodeExistsException 异常等。
curator它主要包含三个依赖(curator的依赖都已经放到maven仓库,你直接使用maven来构建它。对于大多数人来说,我们可能最常需要引入的是curator-recipes):
- curator-recipes:包含了curator实现的各项功能,如读写锁、互斥锁、队列等,依赖于framework和Client:http://curator.apache.org/curator-recipes/index.html
- curator-framework:包含了高层级的流式API,构建在Client之上如对节点的增删改查等:http://curator.apache.org/curator-recipes/index.html
- curator-client:Zookeeper的基础客户端实现,如连接、重试、超时处理等:http://curator.apache.org/curator-client/index.html
二. 版本问题
常用版本 :
Curator 2.x.x-兼容两个zk版本 ( zk 3.4.x 和zk 3.5.x ),Curator 3.x.x-兼容zk 3.5。
因此为了不必要的麻烦,我们推荐使用2.x.x。
最新版兼容模式的介绍:Apache Curator – 版本
说的是 Zookeeper 3.4.x 版本已经是最后一个版本,curator 最新版本移除了对 3.4.x 的支持。如果你想在 Zookeeper 3.4.x 中使用Curator ,可以选择 4.2.x 版本的 curator。curator 4.2.x 版本和 zookeeper 3.4.x 版本会在兼容模式下运行。为了使用这种模式,你必须在版本管理工具中移除对 Zookeeper 的依赖,并且重新添加对 Zookeeper 的依赖。
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.2.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.14</version>
</dependency>
三. Curator使用
1. 创建客户端连接
//3000为每次重试间隔时间(单位:毫秒),10为最大重试次数
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
//-------------------第一种方式------------------------
//参数1:服务端地址+端口号,参数2:会话超时时间,参数3:连接超时时间
CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.170.148:2181",
60 * 1000, 15 * 1000, retryPolicy);
//-------------------第二种方式------------------------
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.170.148:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy).build();
//开启连接
client.start();
2. 创建节点
//1、基本创建
String path = client.create().forPath("/comm_msg_nd","ctx".getBytes());
String path1 = client.create().forPath("/comm_no_msg_nd");
//2、创建临时节点
String path2 = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app4");
//3、创建多级节点
String path3 = client.create().creatingParentsIfNeeded().forPath("/app5/p1");
需要注意,只有叶节点可以做临时节点,所以叶节点的父节点必须是永久节点,也就是creatingParentsIfNeeded这个方法创建的父节点必须是永久节点。
3. 删除节点
//删除单独一个节点
client.delete().forPath("/comm_msg_nd");
//删除多级节点
client.create().creatingParentsIfNeeded().forPath("/p1/p2/p3/multi_nd");
client.delete().deletingChildrenIfNeeded().forPath("/p1");
//删除指定版本的节点
client.delete().withVersion(0).forPath("/comm_msg_nd");
//保证删除,失败后继续执行
client.delete().guaranteed().forPath("/comm_msg_nd");
// 删除后回调
client.delete().inBackground((curatorFramework1, curatorEvent) -> {
System.out.println(curatorFramework1);
System.out.println(curatorEvent);
}).forPath("/app1");
4. 查询节点
//1、查询数据:get
byte[] data = client.getData().forPath("/app1");
System.out.println(new String(data));
//2、查询子节点:ls
List<String> list = client.getChildren().forPath("/");
System.out.println(list);
//3、查询节点状态信息:ls -s
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("/app1");
System.out.println(stat);
5. 修改节点
//1、修改节点数据(基本修改)
client.setData().forPath("/app1", "333".getBytes(StandardCharsets.UTF_8));
//2、根据版本号修改
Stat stat1 = new Stat();
client.getData().storingStatIn(stat1).forPath("/app1");
client.setData().withVersion(stat1.getVersion()).forPath("/app1", "itcast".getBytes(StandardCharsets.UTF_8));
6. 事件监听
zookeeper原生支持通过注册watcher来进行事件监听,但是其使用不是特别方便,需要开发人员自己反复注册watcher,比较繁琐。Curator引入Cache来实现对zookeeper服务端事务的监听。Cache是Curator中对事件监听的包装,其对事件的监听其实可以近似看作是一个本地缓存视图和远程Zookeeper视图的对比过程。同时Curator能够自动为开发人员处理反复注册监听,从而大大简化原生api开发的繁琐过程。
cache分为三类监听类型,单个节点监听,子节点监听,节点树监听。
- NodeCache(监听和缓存根节点变化) 只监听单一个节点(变化 添加,修改,删除)。
- PathChildrenCache(监听和缓存子节点变化) 监听这个节点下的所有子节点(变化 添加,修改,删除)。
- TreeCache(监听和缓存根节点变化和子节点变化) NodeCache+ PathChildrenCache 监听当前节点及其下的所有子节点的变化。
--- 6.1 单个节点 (5.0之前版本)
//----------------- 监听单个节点 -----------------------------------
//1. 创建节点数据监听对象
final NodeCache nodeCache = new NodeCache(client, "/app1");
//2. 注册监听
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
//如果节点数据有变化,会回调该方法
public void nodeChanged() throws Exception {
System.out.println("节点变化了~");
//获取修改节点后的数据
byte[] data = nodeCache.getCurrentData().getData();
System.out.println(new String(data));
}
});
//3.开始监听
/**
* start参数为true:可以直接获取监听的节点,
* System.out.println(nodeCache.getCurrentData());为ChildData {path='/aa', stat=607,765,1580205779732,1580973376268,2,1,0,0,5,1,608, data=[97, 98, 99, 100, 101]}
* 参数为false:不可以获取监听的节点,System.out.println(nodeCache.getCurrentData());为null
*/
nodeCache.start(true);
--- 6.2 子节点 (5.0之前版本)
//----------------- 监听子节点 -----------------------------------
//1.创建监听对象
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app2", true);
//2. 绑定监听器
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
System.out.println("子节点变化了~");
System.out.println(event);
//监听子节点的数据变更,并且拿到变更后的数据
//1.获取类型
PathChildrenCacheEvent.Type type = event.getType();
//2.判断类型是否是update
if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
System.out.println("数据变了!!!");
byte[] data = event.getData().getData();
System.out.println(new String(data));
}
}
});
//3. 开启
pathChildrenCache.start();
--- 6.3 节点树 (5.0之前版本)
//----------------- 监听节点树 -----------------------------------
//1. 创建监听器
TreeCache treeCache = new TreeCache(client,"/app2");
//2. 注册监听
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) {
System.out.println("节点变化了");
System.out.println(event);
}
});
//3. 开启监听
treeCache.start();
--- 6.4 5.0之后的API
在5.0 Curator版本后以上几个类被标识过期,被CuratorCache取代。使用方法也会有所不同。
// 创建CuratorCache实例,基于路径/father/son/grandson1(这里说的路径都是基于命名空间下的路径)
// 缓存构建选项是SINGLE_NODE_CACHE
CuratorCache cache = CuratorCache.build(client, "/father/son/grandson1",
CuratorCache.Options.SINGLE_NODE_CACHE);
// 创建一系列CuratorCache监听器,都是通过lambda表达式指定
CuratorCacheListener listener = CuratorCacheListener.builder()
// 初始化完成时调用
.forInitialized(() -> System.out.println("[forInitialized] : Cache initialized"))
// 添加或更改缓存中的数据时调用
.forCreatesAndChanges(
(oldNode, node) -> System.out.printf("[forCreatesAndChanges] : Node changed: Old: [%s] New: [%s]\n",
oldNode, node)
)
// 添加缓存中的数据时调用
.forCreates(childData -> System.out.printf("[forCreates] : Node created: [%s]\n", childData))
// 更改缓存中的数据时调用
.forChanges(
(oldNode, node) -> System.out.printf("[forChanges] : Node changed: Old: [%s] New: [%s]\n",
oldNode, node)
)
// 删除缓存中的数据时调用
.forDeletes(childData -> System.out.printf("[forDeletes] : Node deleted: data: [%s]\n", childData))
.forPathChildrenCache(client, (curatorFramework, pathChildrenCacheEvent) -> {
System.out.println("子节点变化了~");
//监听子节点的数据变更,并且拿到变更后的数据
//1.获取类型
PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
System.out.println(type);
//2.判断类型是否是update
if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
System.out.println("数据变了!!!");
byte[] data = pathChildrenCacheEvent.getData().getData();
System.out.println(new String(data));
}
})
// 添加、更改或删除缓存中的数据时调用
.forAll((type, oldData, data) -> System.out.printf("[forAll] : type: [%s] [%s] [%s]\n", type, oldData, data))
.build();
// 给CuratorCache实例添加监听器
cache.listenable().addListener(listener);
// 启动CuratorCache
cache.start();
7. 分布式锁
curator直接给出了分布式锁的实现。原理是客户端创建锁节点,执行完毕后再删除锁节点。一个客户端先检查是否有锁节点,如果没有,说明可以执行,则创建锁节点去执行。如果有锁节点,则说明现在锁在别的客户端那里,自己则需要等待。
String lockPath = "/123/111";
InterProcessMutex lock = new InterProcessMutex(client,lockPath);
lock.acquire();
//do something
lock.release();
8. leader选举
在分布式计算中,leader election是很重要的一个功能,这个选举过程是这样子的:指派一个进程作为组织者,将任务分发给各节点。在任务开始前,哪个节点都不知道谁是leader或者coordinator。当选举算法开始执行后,每个节点最终会得到一个唯一的节点作为任务leader。除此之外,选举还经常会发生在leader意外宕机的情况下,新的leader要被选举出来。
Curator有两种选举recipe,你可以根据你的需求选择合适的。
--- 8.1 Leader Latch
实例被选为leader后,执行isLeader中的逻辑。当领导权易主之后才会再次执行isLeader。
/*
* isLeader 中的方法会在实例被选为主节点后被执行, 而notLeader中的不会被执行
* 如果主节点被失效, 会进行重新选主
* */
public void setLeaderLatch(String path) {
try {
String id = "client#" + InetAddress.getLocalHost().getHostAddress();
leaderLatch = new LeaderLatch(client, path, id);
LeaderLatchListener leaderLatchListener = new LeaderLatchListener() {
@Override
public void isLeader() {
logger.info("[LeaderLatch]我是主节点, id={}", leaderLatch.getId());
}
@Override
public void notLeader() {
logger.info("[LeaderLatch]我不是主节点, id={}", leaderLatch.getId());
}
};
leaderLatch.addListener(leaderLatchListener);
leaderLatch.start();
} catch (Exception e) {
logger.error("c创建LeaderLatch失败, path={}", path);
}
}
/*
* 判断实例是否是主节点
* */
public boolean hasLeadershipByLeaderLatch() {
return leaderLatch.hasLeadership();
}
/*
* 阻塞直到获得领导权
* */
public void awaitByLeaderLatch() {
try {
leaderLatch.await();
} catch (InterruptedException | EOFException e) {
e.printStackTrace();
}
}
/*
* 尝试获得领导权并超时
* */
public boolean awaitByLeaderLatch(long timeout, TimeUnit unit) {
boolean hasLeadership = false;
try {
hasLeadership = leaderLatch.await(timeout, unit);
} catch (InterruptedException e) {
e.printStackTrace();
}
return hasLeadership;
}
- 创建LeaderLatch需要两个参数:curator框架的client和latch path。其中latch path 参数至关重要。要求参与leader选举的所有节点必须使用相同的latch path。从另一个角度理解就是说,Curator保证,使用相同latch path的节点,同一时间只会有一个获取到leader角色。
- 使用LeaderLatch之前,必须调用它的start方法,开始选举过程。
- LeaderLatch的addListener方法为LeaderLatch增加一个监听器。该监听器会在leader状态发生变化之时(即hasLeadership)调用。
- hasLeadership方法返回是否为leader角色。
- await方法调用的时候,如果目前为leader角色,方法会立即返回并继续向下运行。如果当前不是leader角色,该方法会阻塞,直到被中断,leaderlatch被关闭或者获取到leader角色。
- await方法还有一个重载版本await(long timeout, TimeUnit unit)。这个方法可以指定最大等待时间。如果当前不为leader,运行这个方法的时候会阻塞,直到阻塞时间超过最大等待时间。
--- 8.2 Leader Election
当实例被选为leader之后,调用takeLeadership方法进行业务逻辑处理,处理完成即释放领导权。其中autoRequeue()方法的调用确保此实例在释放领导权后还可能获得领导权。
LeaderSelector selector = new LeaderSelector(client, "/tmp/leader/master", new LeaderSelectorListener() {
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
//成为leader了
System.out.println("do leader work");
Thread.sleep(5000);
System.out.println("end work");
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
System.out.println("stateChanged:"+newState);
}
});
selector.autoRequeue();
selector.start();
Thread.sleep(Integer.MAX_VALUE);