首发于工具使用

Curator的基本使用

一. 前言

官网 : Apache Curator

Curator是netflix公司开源的一套zookeeper客户端。与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。Curator解决了很多zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册wathcer和NodeExistsException 异常等。

curator它主要包含三个依赖(curator的依赖都已经放到maven仓库,你直接使用maven来构建它。对于大多数人来说,我们可能最常需要引入的是curator-recipes):

二. 版本问题

常用版本 :

Curator 2.x.x-兼容两个zk版本 ( zk 3.4.x 和zk 3.5.x ),Curator 3.x.x-兼容zk 3.5。
因此为了不必要的麻烦,我们推荐使用2.x.x。

最新版兼容模式的介绍Apache Curator – 版本

说的是 Zookeeper 3.4.x 版本已经是最后一个版本,curator 最新版本移除了对 3.4.x 的支持。如果你想在 Zookeeper 3.4.x 中使用Curator ,可以选择 4.2.x 版本的 curator。curator 4.2.x 版本和 zookeeper 3.4.x 版本会在兼容模式下运行。为了使用这种模式,你必须在版本管理工具中移除对 Zookeeper 的依赖,并且重新添加对 Zookeeper 的依赖。

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.2.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
       <groupId>org.apache.zookeeper</groupId>
       <artifactId>zookeeper</artifactId>
       <version>3.4.14</version>
</dependency>

三. Curator使用

1. 创建客户端连接

//3000为每次重试间隔时间(单位:毫秒),10为最大重试次数
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);

//-------------------第一种方式------------------------
//参数1:服务端地址+端口号,参数2:会话超时时间,参数3:连接超时时间
CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.170.148:2181", 
               60 * 1000, 15 * 1000, retryPolicy);

//-------------------第二种方式------------------------
CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("192.168.170.148:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy).build();

//开启连接
client.start();

2. 创建节点

//1、基本创建
String path = client.create().forPath("/comm_msg_nd","ctx".getBytes());
String path1 = client.create().forPath("/comm_no_msg_nd");

//2、创建临时节点
String path2 = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app4");

//3、创建多级节点
String path3 = client.create().creatingParentsIfNeeded().forPath("/app5/p1");

需要注意,只有叶节点可以做临时节点,所以叶节点的父节点必须是永久节点,也就是creatingParentsIfNeeded这个方法创建的父节点必须是永久节点。

3. 删除节点

//删除单独一个节点
client.delete().forPath("/comm_msg_nd");
//删除多级节点
client.create().creatingParentsIfNeeded().forPath("/p1/p2/p3/multi_nd");
client.delete().deletingChildrenIfNeeded().forPath("/p1");
//删除指定版本的节点
client.delete().withVersion(0).forPath("/comm_msg_nd");
//保证删除,失败后继续执行
client.delete().guaranteed().forPath("/comm_msg_nd");
// 删除后回调
client.delete().inBackground((curatorFramework1, curatorEvent) -> {
	System.out.println(curatorFramework1);
	System.out.println(curatorEvent);
}).forPath("/app1");

4. 查询节点

//1、查询数据:get
byte[] data = client.getData().forPath("/app1");
System.out.println(new String(data));

//2、查询子节点:ls
List<String> list = client.getChildren().forPath("/");
System.out.println(list);

//3、查询节点状态信息:ls -s
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("/app1");
System.out.println(stat);

5. 修改节点

//1、修改节点数据(基本修改)
client.setData().forPath("/app1", "333".getBytes(StandardCharsets.UTF_8));

//2、根据版本号修改
Stat stat1 = new Stat();
client.getData().storingStatIn(stat1).forPath("/app1");
client.setData().withVersion(stat1.getVersion()).forPath("/app1", "itcast".getBytes(StandardCharsets.UTF_8));

6. 事件监听

zookeeper原生支持通过注册watcher来进行事件监听,但是其使用不是特别方便,需要开发人员自己反复注册watcher,比较繁琐。Curator引入Cache来实现对zookeeper服务端事务的监听。Cache是Curator中对事件监听的包装,其对事件的监听其实可以近似看作是一个本地缓存视图和远程Zookeeper视图的对比过程。同时Curator能够自动为开发人员处理反复注册监听,从而大大简化原生api开发的繁琐过程。

cache分为三类监听类型,单个节点监听,子节点监听,节点树监听。

  • NodeCache(监听和缓存根节点变化) 只监听单一个节点(变化 添加,修改,删除)。
  • PathChildrenCache(监听和缓存子节点变化) 监听这个节点下的所有子节点(变化 添加,修改,删除)。
  • TreeCache(监听和缓存根节点变化和子节点变化) NodeCache+ PathChildrenCache 监听当前节点及其下的所有子节点的变化。

--- 6.1 单个节点 (5.0之前版本)

//----------------- 监听单个节点 -----------------------------------
//1. 创建节点数据监听对象
final NodeCache nodeCache = new NodeCache(client, "/app1");

//2. 注册监听
nodeCache.getListenable().addListener(new NodeCacheListener() {
	@Override
	//如果节点数据有变化,会回调该方法
	public void nodeChanged() throws Exception {
		System.out.println("节点变化了~");
		//获取修改节点后的数据
		byte[] data = nodeCache.getCurrentData().getData();
		System.out.println(new String(data));
	}
});
//3.开始监听
/**
 * start参数为true:可以直接获取监听的节点,
 * System.out.println(nodeCache.getCurrentData());为ChildData {path='/aa', stat=607,765,1580205779732,1580973376268,2,1,0,0,5,1,608, data=[97, 98, 99, 100, 101]}
 * 参数为false:不可以获取监听的节点,System.out.println(nodeCache.getCurrentData());为null
 */
nodeCache.start(true);

--- 6.2 子节点 (5.0之前版本)

//----------------- 监听子节点 -----------------------------------
//1.创建监听对象
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app2", true);
//2. 绑定监听器
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
	@Override
	public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
		System.out.println("子节点变化了~");
		System.out.println(event);
		//监听子节点的数据变更,并且拿到变更后的数据
		//1.获取类型
		PathChildrenCacheEvent.Type type = event.getType();
		//2.判断类型是否是update
		if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
			System.out.println("数据变了!!!");
			byte[] data = event.getData().getData();
			System.out.println(new String(data));
		}
	}
});
//3. 开启
pathChildrenCache.start();

--- 6.3 节点树 (5.0之前版本)

//----------------- 监听节点树 -----------------------------------
//1. 创建监听器
TreeCache treeCache = new TreeCache(client,"/app2");
//2. 注册监听
treeCache.getListenable().addListener(new TreeCacheListener() {
	@Override
	public void childEvent(CuratorFramework client, TreeCacheEvent event) {
		System.out.println("节点变化了");
		System.out.println(event);
	}
});
//3. 开启监听
treeCache.start();

--- 6.4 5.0之后的API

在5.0 Curator版本后以上几个类被标识过期,被CuratorCache取代。使用方法也会有所不同。

// 创建CuratorCache实例,基于路径/father/son/grandson1(这里说的路径都是基于命名空间下的路径)
// 缓存构建选项是SINGLE_NODE_CACHE
CuratorCache cache = CuratorCache.build(client, "/father/son/grandson1",
		CuratorCache.Options.SINGLE_NODE_CACHE);

// 创建一系列CuratorCache监听器,都是通过lambda表达式指定
CuratorCacheListener listener = CuratorCacheListener.builder()
		// 初始化完成时调用
		.forInitialized(() -> System.out.println("[forInitialized] : Cache initialized"))
		// 添加或更改缓存中的数据时调用
		.forCreatesAndChanges(
				(oldNode, node) -> System.out.printf("[forCreatesAndChanges] : Node changed: Old: [%s] New: [%s]\n",
						oldNode, node)
		)
		// 添加缓存中的数据时调用
		.forCreates(childData -> System.out.printf("[forCreates] : Node created: [%s]\n", childData))
		// 更改缓存中的数据时调用
		.forChanges(
				(oldNode, node) -> System.out.printf("[forChanges] : Node changed: Old: [%s] New: [%s]\n",
						oldNode, node)
		)
		// 删除缓存中的数据时调用
		.forDeletes(childData -> System.out.printf("[forDeletes] : Node deleted: data: [%s]\n", childData))
		.forPathChildrenCache(client, (curatorFramework, pathChildrenCacheEvent) -> {
			System.out.println("子节点变化了~");
			//监听子节点的数据变更,并且拿到变更后的数据
			//1.获取类型
			PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
			System.out.println(type);
			//2.判断类型是否是update
			if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
				System.out.println("数据变了!!!");
				byte[] data = pathChildrenCacheEvent.getData().getData();
				System.out.println(new String(data));

			}
		})
		// 添加、更改或删除缓存中的数据时调用
		.forAll((type, oldData, data) -> System.out.printf("[forAll] : type: [%s] [%s] [%s]\n", type, oldData, data))
		.build();

// 给CuratorCache实例添加监听器
cache.listenable().addListener(listener);

// 启动CuratorCache
cache.start();

7. 分布式锁

curator直接给出了分布式锁的实现。原理是客户端创建锁节点,执行完毕后再删除锁节点。一个客户端先检查是否有锁节点,如果没有,说明可以执行,则创建锁节点去执行。如果有锁节点,则说明现在锁在别的客户端那里,自己则需要等待。

String lockPath = "/123/111";
InterProcessMutex lock = new InterProcessMutex(client,lockPath);
lock.acquire();
//do something
lock.release();

8. leader选举

在分布式计算中,leader election是很重要的一个功能,这个选举过程是这样子的:指派一个进程作为组织者,将任务分发给各节点。在任务开始前,哪个节点都不知道谁是leader或者coordinator。当选举算法开始执行后,每个节点最终会得到一个唯一的节点作为任务leader。除此之外,选举还经常会发生在leader意外宕机的情况下,新的leader要被选举出来。

Curator有两种选举recipe,你可以根据你的需求选择合适的。

--- 8.1 Leader Latch

实例被选为leader后,执行isLeader中的逻辑。当领导权易主之后才会再次执行isLeader。

/*
    *  isLeader 中的方法会在实例被选为主节点后被执行, 而notLeader中的不会被执行
    *  如果主节点被失效, 会进行重新选主
    * */
    public void setLeaderLatch(String path) {
        try {
            String id = "client#" + InetAddress.getLocalHost().getHostAddress();
            leaderLatch = new LeaderLatch(client, path, id);
            LeaderLatchListener leaderLatchListener = new LeaderLatchListener() {
                @Override
                public void isLeader() {
                    logger.info("[LeaderLatch]我是主节点, id={}", leaderLatch.getId());
                }

                @Override
                public void notLeader() {
                    logger.info("[LeaderLatch]我不是主节点, id={}", leaderLatch.getId());
                }
            };
            leaderLatch.addListener(leaderLatchListener);
            leaderLatch.start();
        } catch (Exception e) {
            logger.error("c创建LeaderLatch失败, path={}", path);
        }
    }

    /*
    *   判断实例是否是主节点
    * */
    public boolean hasLeadershipByLeaderLatch() {
        return leaderLatch.hasLeadership();
    }

    /*
    *   阻塞直到获得领导权
    * */
    public void awaitByLeaderLatch() {
        try {
            leaderLatch.await();
        } catch (InterruptedException | EOFException e) {
            e.printStackTrace();
        }
    }

    /*
    *   尝试获得领导权并超时
    * */
    public boolean awaitByLeaderLatch(long timeout, TimeUnit unit) {
        boolean hasLeadership = false;
        try {
            hasLeadership = leaderLatch.await(timeout, unit);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return  hasLeadership;
    }
  1. 创建LeaderLatch需要两个参数:curator框架的client和latch path。其中latch path 参数至关重要。要求参与leader选举的所有节点必须使用相同的latch path。从另一个角度理解就是说,Curator保证,使用相同latch path的节点,同一时间只会有一个获取到leader角色。
  2. 使用LeaderLatch之前,必须调用它的start方法,开始选举过程。
  3. LeaderLatch的addListener方法为LeaderLatch增加一个监听器。该监听器会在leader状态发生变化之时(即hasLeadership)调用。
  4. hasLeadership方法返回是否为leader角色。
  5. await方法调用的时候,如果目前为leader角色,方法会立即返回并继续向下运行。如果当前不是leader角色,该方法会阻塞,直到被中断,leaderlatch被关闭或者获取到leader角色。
  6. await方法还有一个重载版本await(long timeout, TimeUnit unit)。这个方法可以指定最大等待时间。如果当前不为leader,运行这个方法的时候会阻塞,直到阻塞时间超过最大等待时间。

--- 8.2 Leader Election

当实例被选为leader之后,调用takeLeadership方法进行业务逻辑处理,处理完成即释放领导权。其中autoRequeue()方法的调用确保此实例在释放领导权后还可能获得领导权。

LeaderSelector selector = new LeaderSelector(client, "/tmp/leader/master", new LeaderSelectorListener() {
	@Override
	public void takeLeadership(CuratorFramework client) throws Exception {
		//成为leader了
		System.out.println("do leader work");
		Thread.sleep(5000);
		System.out.println("end work");
	}

	@Override
	public void stateChanged(CuratorFramework client, ConnectionState newState) {
		System.out.println("stateChanged:"+newState);
	}
});
selector.autoRequeue();
selector.start();
Thread.sleep(Integer.MAX_VALUE);

编辑于 2023-03-04 20:30・IP 属地北京