Zookeeper API

ZooKeeper有一个绑定Java和C的官方API。Zookeeper社区为大多数语言(.NET,python等)提供非官方API。使用ZooKeeper API,应用程序可以连接,交互,操作数据,协调,最后断开与ZooKeeper集合的连接。

ZooKeeper API具有丰富的功能,以简单和安全的方式获得ZooKeeper集合的所有功能。ZooKeeper API提供同步和异步方法。

ZooKeeper集合和ZooKeeper API在各个方面都完全相辅相成,对开发人员有很大的帮助。让我们在本章讨论Java绑定。

ZooKeeper API的基础知识

与ZooKeeper集合进行交互的应用程序称为 ZooKeeper客户端或简称客户端

Znode是ZooKeeper集合的核心组件,ZooKeeper API提供了一小组方法使用ZooKeeper集合来操纵znode的所有细节。

客户端应该遵循以步骤,与ZooKeeper集合进行清晰和干净的交互。

  • 连接到ZooKeeper集合。ZooKeeper集合为客户端分配会话ID。

  • 定期向服务器发送心跳。否则,ZooKeeper集合将过期会话ID,客户端需要重新连接。

  • 只要会话ID处于活动状态,就可以获取/设置znode。

  • 所有任务完成后,断开与ZooKeeper集合的连接。如果客户端长时间不活动,则ZooKeeper集合将自动断开客户端。

Java绑定

让我们来了解本章中最重要的一组ZooKeeper API。ZooKeeper API的核心部分是ZooKeeper类。它提供了在其构造函数中连接ZooKeeper集合的选项,并具有以下方法:

  • connect - 连接到ZooKeeper集合

  • create- 创建znode

  • exists- 检查znode是否存在及其信息

  • getData - 从特定的znode获取数据

  • setData - 在特定的znode中设置数据

  • getChildren - 获取特定znode中的所有子节点

  • delete - 删除特定的znode及其所有子项

  • close - 关闭连接

连接到ZooKeeper集合

ZooKeeper类通过其构造函数提供connect功能。构造函数的签名如下 :

  1. ZooKeeper(String connectionString, int sessionTimeout, Watcher watcher)
  • connectionString - ZooKeeper集合主机。

  • sessionTimeout - 会话超时(以毫秒为单位)。

  • watcher - 实现“监视器”界面的对象。ZooKeeper集合通过监视器对象返回连接状态。

让我们创建一个新的帮助类 ZooKeeperConnection ,并添加一个方法 connect connect 方法创建一个ZooKeeper对象,连接到ZooKeeper集合,然后返回对象。

这里 CountDownLatch 用于停止(等待)主进程,直到客户端与ZooKeeper集合连接。

ZooKeeper集合通过监视器回调来回复连接状态。一旦客户端与ZooKeeper集合连接,监视器回调就会被调用,并且监视器回调函数调用CountDownLatchcountDown方法来释放锁,在主进程中await

以下是与ZooKeeper集合连接的完整代码。

编码:ZooKeeperConnection.java

  1. // import java classes
  2. import java.io.IOException;
  3. import java.util.concurrent.CountDownLatch;
  4.  
  5. // import zookeeper classes
  6. import org.apache.zookeeper.KeeperException;
  7. import org.apache.zookeeper.WatchedEvent;
  8. import org.apache.zookeeper.Watcher;
  9. import org.apache.zookeeper.Watcher.Event.KeeperState;
  10. import org.apache.zookeeper.ZooKeeper;
  11. import org.apache.zookeeper.AsyncCallback.StatCallback;
  12. import org.apache.zookeeper.KeeperException.Code;
  13. import org.apache.zookeeper.data.Stat;
  14.  
  15. public class ZooKeeperConnection {
  16.  
  17. // declare zookeeper instance to access ZooKeeper ensemble
  18. private ZooKeeper zoo;
  19. final CountDownLatch connectedSignal = new CountDownLatch(1);
  20.  
  21. // Method to connect zookeeper ensemble.
  22. public ZooKeeper connect(String host) throws IOException,InterruptedException {
  23.  
  24. zoo = new ZooKeeper(host,5000,new Watcher() {
  25.  
  26. public void process(WatchedEvent we) {
  27.  
  28. if (we.getState() == KeeperState.SyncConnected) {
  29. connectedSignal.countDown();
  30. }
  31. }
  32. });
  33.  
  34. connectedSignal.await();
  35. return zoo;
  36. }
  37.  
  38. // Method to disconnect from zookeeper server
  39. public void close() throws InterruptedException {
  40. zoo.close();
  41. }
  42. }

保存上面的代码,它将在下一节中用于连接ZooKeeper集合。

创建Znode

ZooKeeper类提供了在ZooKeeper集合中创建一个新的znode的create方法。 create 方法的签名如下:

  1. create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
  • path - Znode路径。例如,/myapp1,/myapp2,/myapp1/mydata1,myapp2/mydata1/myanothersubdata

  • data - 要存储在指定znode路径中的数据

  • acl - 要创建的节点的访问控制列表。ZooKeeper API提供了一个静态接口 ZooDefs.Ids 来获取一些基本的acl列表。例如,ZooDefs.Ids.OPEN_ACL_UNSAFE返回打开znode的acl列表。

  • createMode - 节点的类型,即临时,顺序或两者。这是一个枚举

让我们创建一个新的Java应用程序来检查ZooKeeper API的 create 功能。创建文件 ZKCreate.java 。在main方法中,创建一个类型为 ZooKeeperConnection 的对象,并调用 connect 方法连接到ZooKeeper集合。

connect方法将返回ZooKeeper对象 zk 。现在,请使用自定义pathdata调用 zk 对象的 create 方法。

创建znode的完整程序代码如下:

编码:ZKCreate.java

  1. import java.io.IOException;
  2.  
  3. import org.apache.zookeeper.WatchedEvent;
  4. import org.apache.zookeeper.Watcher;
  5. import org.apache.zookeeper.Watcher.Event.KeeperState;
  6. import org.apache.zookeeper.ZooKeeper;
  7. import org.apache.zookeeper.KeeperException;
  8. import org.apache.zookeeper.CreateMode;
  9. import org.apache.zookeeper.ZooDefs;
  10.  
  11. public class ZKCreate {
  12. // create static instance for zookeeper class.
  13. private static ZooKeeper zk;
  14.  
  15. // create static instance for ZooKeeperConnection class.
  16. private static ZooKeeperConnection conn;
  17.  
  18. // Method to create znode in zookeeper ensemble
  19. public static void create(String path, byte[] data) throws
  20. KeeperException,InterruptedException {
  21. zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
  22. CreateMode.PERSISTENT);
  23. }
  24.  
  25. public static void main(String[] args) {
  26.  
  27. // znode path
  28. String path = "/MyFirstZnode"; // Assign path to znode
  29.  
  30. // data in byte array
  31. byte[] data = "My first zookeeper app".getBytes(); // Declare data
  32.  
  33. try {
  34. conn = new ZooKeeperConnection();
  35. zk = conn.connect("localhost");
  36. create(path, data); // Create the data to the specified path
  37. conn.close();
  38. } catch (Exception e) {
  39. System.out.println(e.getMessage()); //Catch error message
  40. }
  41. }
  42. }

一旦编译和执行应用程序,将在ZooKeeper集合中创建具有指定数据的znode。你可以使用ZooKeeper CLI zkCli.sh 进行检查。

  1. cd /path/to/zookeeper
  2. bin/zkCli.sh
  3. >>> get /MyFirstZnode

Exists - 检查Znode的存在

ZooKeeper类提供了 exists 方法来检查znode的存在。如果指定的znode存在,则返回一个znode的元数据。exists方法的签名如下:

  1. exists(String path, boolean watcher)
  • path- Znode路径

  • watcher - 布尔值,用于指定是否监视指定的znode

让我们创建一个新的Java应用程序来检查ZooKeeper API的“exists”功能。创建文件“ZKExists.java”。在main方法中,使用“ZooKeeperConnection”对象创建ZooKeeper对象“zk”。然后,使用自定义“path”调用“zk”对象的“exists”方法。完整的列表如下:

编码:ZKExists.java

  1. import java.io.IOException;
  2.  
  3. import org.apache.zookeeper.ZooKeeper;
  4. import org.apache.zookeeper.KeeperException;
  5. import org.apache.zookeeper.WatchedEvent;
  6. import org.apache.zookeeper.Watcher;
  7. import org.apache.zookeeper.Watcher.Event.KeeperState;
  8. import org.apache.zookeeper.data.Stat;
  9.  
  10. public class ZKExists {
  11. private static ZooKeeper zk;
  12. private static ZooKeeperConnection conn;
  13.  
  14. // Method to check existence of znode and its status, if znode is available.
  15. public static Stat znode_exists(String path) throws
  16. KeeperException,InterruptedException {
  17. return zk.exists(path, true);
  18. }
  19.  
  20. public static void main(String[] args) throws InterruptedException,KeeperException {
  21. String path = "/MyFirstZnode"; // Assign znode to the specified path
  22.  
  23. try {
  24. conn = new ZooKeeperConnection();
  25. zk = conn.connect("localhost");
  26. Stat stat = znode_exists(path); // Stat checks the path of the znode
  27.  
  28. if(stat != null) {
  29. System.out.println("Node exists and the node version is " +
  30. stat.getVersion());
  31. } else {
  32. System.out.println("Node does not exists");
  33. }
  34.  
  35. } catch(Exception e) {
  36. System.out.println(e.getMessage()); // Catches error messages
  37. }
  38. }
  39. }

一旦编译和执行应用程序,你将获得以下输出。

  1. Node exists and the node version is 1.

getData方法

ZooKeeper类提供 getData 方法来获取附加在指定znode中的数据及其状态。 getData 方法的签名如下:

  1. getData(String path, Watcher watcher, Stat stat)
  • path - Znode路径。

  • watcher - 监视器类型的回调函数。当指定的znode的数据改变时,ZooKeeper集合将通过监视器回调进行通知。这是一次性通知。

  • stat - 返回znode的元数据。

让我们创建一个新的Java应用程序来了解ZooKeeper API的 getData 功能。创建文件 ZKGetData.java 。在main方法中,使用 ZooKeeperConnection 对象创建一个ZooKeeper对象 zk 。然后,使用自定义路径调用zk对象的 getData 方法。

下面是从指定节点获取数据的完整程序代码:

编码:ZKGetData.java

  1. import java.io.IOException;
  2. import java.util.concurrent.CountDownLatch;
  3.  
  4. import org.apache.zookeeper.ZooKeeper;
  5. import org.apache.zookeeper.KeeperException;
  6. import org.apache.zookeeper.WatchedEvent;
  7. import org.apache.zookeeper.Watcher;
  8. import org.apache.zookeeper.Watcher.Event.KeeperState;
  9. import org.apache.zookeeper.data.Stat;
  10.  
  11. public class ZKGetData {
  12.  
  13. private static ZooKeeper zk;
  14. private static ZooKeeperConnection conn;
  15. public static Stat znode_exists(String path) throws
  16. KeeperException,InterruptedException {
  17. return zk.exists(path,true);
  18. }
  19.  
  20. public static void main(String[] args) throws InterruptedException, KeeperException {
  21. String path = "/MyFirstZnode";
  22. final CountDownLatch connectedSignal = new CountDownLatch(1);
  23.  
  24. try {
  25. conn = new ZooKeeperConnection();
  26. zk = conn.connect("localhost");
  27. Stat stat = znode_exists(path);
  28.  
  29. if(stat != null) {
  30. byte[] b = zk.getData(path, new Watcher() {
  31.  
  32. public void process(WatchedEvent we) {
  33.  
  34. if (we.getType() == Event.EventType.None) {
  35. switch(we.getState()) {
  36. case Expired:
  37. connectedSignal.countDown();
  38. break;
  39. }
  40.  
  41. } else {
  42. String path = "/MyFirstZnode";
  43.  
  44. try {
  45. byte[] bn = zk.getData(path,
  46. false, null);
  47. String data = new String(bn,
  48. "UTF-8");
  49. System.out.println(data);
  50. connectedSignal.countDown();
  51.  
  52. } catch(Exception ex) {
  53. System.out.println(ex.getMessage());
  54. }
  55. }
  56. }
  57. }, null);
  58.  
  59. String data = new String(b, "UTF-8");
  60. System.out.println(data);
  61. connectedSignal.await();
  62.  
  63. } else {
  64. System.out.println("Node does not exists");
  65. }
  66. } catch(Exception e) {
  67. System.out.println(e.getMessage());
  68. }
  69. }
  70. }

一旦编译和执行应用程序,你将获得以下输出

  1. My first zookeeper app

应用程序将等待ZooKeeper集合的进一步通知。使用ZooKeeper CLI zkCli.sh 更改指定znode的数据。

  1. cd /path/to/zookeeper
  2. bin/zkCli.sh
  3. >>> set /MyFirstZnode Hello

现在,应用程序将打印以下输出并退出。

  1. Hello

setData方法

ZooKeeper类提供 setData 方法来修改指定znode中附加的数据。 setData 方法的签名如下:

  1. setData(String path, byte[] data, int version)
  • path- Znode路径

  • data - 要存储在指定znode路径中的数据。

  • version- znode的当前版本。每当数据更改时,ZooKeeper会更新znode的版本号。

现在让我们创建一个新的Java应用程序来了解ZooKeeper API的 setData 功能。创建文件 ZKSetData.java 。在main方法中,使用 ZooKeeperConnection 对象创建一个ZooKeeper对象 zk 。然后,使用指定的路径,新数据和节点版本调用 zk 对象的 setData 方法。

以下是修改附加在指定znode中的数据的完整程序代码。

编码:ZKSetData.java

  1. import org.apache.zookeeper.ZooKeeper;
  2. import org.apache.zookeeper.KeeperException;
  3. import org.apache.zookeeper.WatchedEvent;
  4. import org.apache.zookeeper.Watcher;
  5. import org.apache.zookeeper.Watcher.Event.KeeperState;
  6.  
  7. import java.io.IOException;
  8.  
  9. public class ZKSetData {
  10. private static ZooKeeper zk;
  11. private static ZooKeeperConnection conn;
  12.  
  13. // Method to update the data in a znode. Similar to getData but without watcher.
  14. public static void update(String path, byte[] data) throws
  15. KeeperException,InterruptedException {
  16. zk.setData(path, data, zk.exists(path,true).getVersion());
  17. }
  18.  
  19. public static void main(String[] args) throws InterruptedException,KeeperException {
  20. String path= "/MyFirstZnode";
  21. byte[] data = "Success".getBytes(); //Assign data which is to be updated.
  22.  
  23. try {
  24. conn = new ZooKeeperConnection();
  25. zk = conn.connect("localhost");
  26. update(path, data); // Update znode data to the specified path
  27. } catch(Exception e) {
  28. System.out.println(e.getMessage());
  29. }
  30. }
  31. }

编译并执行应用程序后,指定的znode的数据将被改变,并且可以使用ZooKeeper CLI zkCli.sh 进行检查。

  1. cd /path/to/zookeeper
  2. bin/zkCli.sh
  3. >>> get /MyFirstZnode

getChildren方法

ZooKeeper类提供 getChildren 方法来获取特定znode的所有子节点。 getChildren 方法的签名如下:

  1. getChildren(String path, Watcher watcher)
  • path - Znode路径。

  • watcher - 监视器类型的回调函数。当指定的znode被删除或znode下的子节点被创建/删除时,ZooKeeper集合将进行通知。这是一次性通知。

编码:ZKGetChildren.java

  1. import java.io.IOException;
  2. import java.util.*;
  3.  
  4. import org.apache.zookeeper.ZooKeeper;
  5. import org.apache.zookeeper.KeeperException;
  6. import org.apache.zookeeper.WatchedEvent;
  7. import org.apache.zookeeper.Watcher;
  8. import org.apache.zookeeper.Watcher.Event.KeeperState;
  9. import org.apache.zookeeper.data.Stat;
  10.  
  11. public class ZKGetChildren {
  12. private static ZooKeeper zk;
  13. private static ZooKeeperConnection conn;
  14.  
  15. // Method to check existence of znode and its status, if znode is available.
  16. public static Stat znode_exists(String path) throws
  17. KeeperException,InterruptedException {
  18. return zk.exists(path,true);
  19. }
  20.  
  21. public static void main(String[] args) throws InterruptedException,KeeperException {
  22. String path = "/MyFirstZnode"; // Assign path to the znode
  23.  
  24. try {
  25. conn = new ZooKeeperConnection();
  26. zk = conn.connect("localhost");
  27. Stat stat = znode_exists(path); // Stat checks the path
  28.  
  29. if(stat!= null) {
  30.  
  31. //“getChildren" method- get all the children of znode.It has two
  32. args, path and watch
  33. List <String> children = zk.getChildren(path, false);
  34. for(int i = 0; i < children.size(); i++)
  35. System.out.println(children.get(i)); //Print children's
  36. } else {
  37. System.out.println("Node does not exists");
  38. }
  39.  
  40. } catch(Exception e) {
  41. System.out.println(e.getMessage());
  42. }
  43.  
  44. }
  45.  
  46. }

在运行程序之前,让我们使用ZooKeeper CLI zkCli.sh /MyFirstZnode 创建两个子节点。

  1. cd /path/to/zookeeper
  2. bin/zkCli.sh
  3. >>> create /MyFirstZnode/myfirstsubnode Hi
  4. >>> create /MyFirstZnode/mysecondsubmode Hi

现在,编译和运行程序将输出上面创建的znode。

  1. myfirstsubnode
  2. mysecondsubnode

删除Znode

ZooKeeper类提供了 delete 方法来删除指定的znode。delete 方法的签名如下:

  1. delete(String path, int version)
  • path - Znode路径。

  • version - znode的当前版本。

让我们创建一个新的Java应用程序来了解ZooKeeper API的 delete 功能。创建文件 ZKDelete.java 。在main方法中,使用 ZooKeeperConnection 对象创建一个ZooKeeper对象 zk 。然后,使用指定的路径和版本号调用 zk 对象的 delete 方法。

删除znode的完整程序代码如下:

编码:ZKDelete.java

  1. import org.apache.zookeeper.ZooKeeper;
  2. import org.apache.zookeeper.KeeperException;
  3.  
  4. public class ZKDelete {
  5. private static ZooKeeper zk;
  6. private static ZooKeeperConnection conn;
  7.  
  8. // Method to check existence of znode and its status, if znode is available.
  9. public static void delete(String path) throws KeeperException,InterruptedException {
  10. zk.delete(path,zk.exists(path,true).getVersion());
  11. }
  12.  
  13. public static void main(String[] args) throws InterruptedException,KeeperException {
  14. String path = "/MyFirstZnode"; //Assign path to the znode
  15.  
  16. try {
  17. conn = new ZooKeeperConnection();
  18. zk = conn.connect("localhost");
  19. delete(path); //delete the node with the specified path
  20. } catch(Exception e) {
  21. System.out.println(e.getMessage()); // catches error messages
  22. }
  23. }
  24. }